Skip to content

Instantly share code, notes, and snippets.

@tsusanto
Last active June 19, 2020 05:05
Show Gist options
  • Select an option

  • Save tsusanto/5fbea1f137a60a27c13ea36077eabca2 to your computer and use it in GitHub Desktop.

Select an option

Save tsusanto/5fbea1f137a60a27c13ea36077eabca2 to your computer and use it in GitHub Desktop.

Revisions

  1. tsusanto revised this gist Feb 17, 2017. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion sparkXMLdemo.scala
    Original file line number Diff line number Diff line change
    @@ -23,7 +23,7 @@ object sparkXMLdemo {
    val selectedData = flattened.select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber" as "LineItemSequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.DiscountAmount", $"LineItem.Sale.ExtendedDiscountAmount", $"LineItem.Sale.Quantity")

    val kuduContext = new KuduContext("your-kudu-master-server:7051")
    val df = sqlContext.read.options(Map("kudu.master" -> "sjc-prd-hddn52.sc5.coupons.lan:7051","kudu.table" -> "sales_lines_tenny")).kudu
    val df = sqlContext.read.options(Map("kudu.master" -> "your-kudu-master-server:7051","kudu.table" -> "sales_lines_tenny")).kudu

    selectedData.registerTempTable("selectedData")
    val b = sqlContext.sql("SELECT CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string), '-', cast(LineItemSequenceNumber as string)), CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string)), RetailStoreID,WorkstationID,SequenceNumber,BusinessDayDate,OperatorName,LineItemSequenceNumber,POSIDType,cast(POSItemID as String),Description,TaxAmount,RegularSalesUnitPrice,ExtendedAmount,DiscountAmount,ExtendedDiscountAmount,Quantity FROM selectedData")
  2. tsusanto revised this gist Feb 17, 2017. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions sparkXMLdemo.scala
    Original file line number Diff line number Diff line change
    @@ -22,13 +22,13 @@ object sparkXMLdemo {
    val flattened = newData.withColumn("LineItem", explode($"RetailTransaction.LineItem"))
    val selectedData = flattened.select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber" as "LineItemSequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.DiscountAmount", $"LineItem.Sale.ExtendedDiscountAmount", $"LineItem.Sale.Quantity")

    val kuduContext = new KuduContext("sjc-prd-hddn52.sc5.coupons.lan:7051")
    val kuduContext = new KuduContext("your-kudu-master-server:7051")
    val df = sqlContext.read.options(Map("kudu.master" -> "sjc-prd-hddn52.sc5.coupons.lan:7051","kudu.table" -> "sales_lines_tenny")).kudu

    selectedData.registerTempTable("selectedData")
    val b = sqlContext.sql("SELECT CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string), '-', cast(LineItemSequenceNumber as string)), CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string)), RetailStoreID,WorkstationID,SequenceNumber,BusinessDayDate,OperatorName,LineItemSequenceNumber,POSIDType,cast(POSItemID as String),Description,TaxAmount,RegularSalesUnitPrice,ExtendedAmount,DiscountAmount,ExtendedDiscountAmount,Quantity FROM selectedData")
    val c = b.selectExpr("_c0 as lineid", "_c1 as transactionid", "RetailStoreID as retailstoreid","WorkstationID as workstationid" ,"SequenceNumber as sequencenumber","BusinessDayDate as businessdaydate","OperatorName as operatorname", "LineItemSequenceNumber as lineitemsequencenumber","POSIDType as posidtype","POSItemID as positemid","Description as description","TaxAmount as taxamount","RegularSalesUnitPrice as regularsalesunitprice","ExtendedAmount as extendedamount","DiscountAmount as discountamount","ExtendedDiscountAmount as extendeddiscountamount","Quantity as quantity")
    val mykuduContext = new KuduContext("sjc-prd-hddn52.sc5.coupons.lan:7051")

    mykuduContext.upsertRows(c, "sales_lines_tenny")

    }
  3. tsusanto created this gist Feb 17, 2017.
    35 changes: 35 additions & 0 deletions sparkXMLdemo.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,35 @@
    /**
    * Created by tsusanto on 2/15/2017.
    spark-submit --master yarn-client --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar
    spark-submit --master yarn-cluster --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar
    */

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.kudu.spark.kudu._
    import org.apache.spark.sql.functions._

    object sparkXMLdemo {

    def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("spark-xml-kudu")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val newData = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").option("samplingRatio","1").load("/user/tsusanto/POSLog-201409300635-21.xml")
    val flattened = newData.withColumn("LineItem", explode($"RetailTransaction.LineItem"))
    val selectedData = flattened.select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber" as "LineItemSequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.DiscountAmount", $"LineItem.Sale.ExtendedDiscountAmount", $"LineItem.Sale.Quantity")

    val kuduContext = new KuduContext("sjc-prd-hddn52.sc5.coupons.lan:7051")
    val df = sqlContext.read.options(Map("kudu.master" -> "sjc-prd-hddn52.sc5.coupons.lan:7051","kudu.table" -> "sales_lines_tenny")).kudu

    selectedData.registerTempTable("selectedData")
    val b = sqlContext.sql("SELECT CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string), '-', cast(LineItemSequenceNumber as string)), CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string)), RetailStoreID,WorkstationID,SequenceNumber,BusinessDayDate,OperatorName,LineItemSequenceNumber,POSIDType,cast(POSItemID as String),Description,TaxAmount,RegularSalesUnitPrice,ExtendedAmount,DiscountAmount,ExtendedDiscountAmount,Quantity FROM selectedData")
    val c = b.selectExpr("_c0 as lineid", "_c1 as transactionid", "RetailStoreID as retailstoreid","WorkstationID as workstationid" ,"SequenceNumber as sequencenumber","BusinessDayDate as businessdaydate","OperatorName as operatorname", "LineItemSequenceNumber as lineitemsequencenumber","POSIDType as posidtype","POSItemID as positemid","Description as description","TaxAmount as taxamount","RegularSalesUnitPrice as regularsalesunitprice","ExtendedAmount as extendedamount","DiscountAmount as discountamount","ExtendedDiscountAmount as extendeddiscountamount","Quantity as quantity")
    val mykuduContext = new KuduContext("sjc-prd-hddn52.sc5.coupons.lan:7051")
    mykuduContext.upsertRows(c, "sales_lines_tenny")

    }
    }