Last active
June 19, 2020 05:05
-
-
Save tsusanto/5fbea1f137a60a27c13ea36077eabca2 to your computer and use it in GitHub Desktop.
Revisions
-
tsusanto revised this gist
Feb 17, 2017 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -23,7 +23,7 @@ object sparkXMLdemo { val selectedData = flattened.select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber" as "LineItemSequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.DiscountAmount", $"LineItem.Sale.ExtendedDiscountAmount", $"LineItem.Sale.Quantity") val kuduContext = new KuduContext("your-kudu-master-server:7051") val df = sqlContext.read.options(Map("kudu.master" -> "your-kudu-master-server:7051","kudu.table" -> "sales_lines_tenny")).kudu selectedData.registerTempTable("selectedData") val b = sqlContext.sql("SELECT CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string), '-', cast(LineItemSequenceNumber as string)), CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string)), RetailStoreID,WorkstationID,SequenceNumber,BusinessDayDate,OperatorName,LineItemSequenceNumber,POSIDType,cast(POSItemID as String),Description,TaxAmount,RegularSalesUnitPrice,ExtendedAmount,DiscountAmount,ExtendedDiscountAmount,Quantity FROM selectedData") -
tsusanto revised this gist
Feb 17, 2017 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -22,13 +22,13 @@ object sparkXMLdemo { val flattened = newData.withColumn("LineItem", explode($"RetailTransaction.LineItem")) val selectedData = flattened.select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber" as "LineItemSequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.DiscountAmount", $"LineItem.Sale.ExtendedDiscountAmount", $"LineItem.Sale.Quantity") val kuduContext = new KuduContext("your-kudu-master-server:7051") val df = sqlContext.read.options(Map("kudu.master" -> "sjc-prd-hddn52.sc5.coupons.lan:7051","kudu.table" -> "sales_lines_tenny")).kudu selectedData.registerTempTable("selectedData") val b = sqlContext.sql("SELECT CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string), '-', cast(LineItemSequenceNumber as string)), CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string)), RetailStoreID,WorkstationID,SequenceNumber,BusinessDayDate,OperatorName,LineItemSequenceNumber,POSIDType,cast(POSItemID as String),Description,TaxAmount,RegularSalesUnitPrice,ExtendedAmount,DiscountAmount,ExtendedDiscountAmount,Quantity FROM selectedData") val c = b.selectExpr("_c0 as lineid", "_c1 as transactionid", "RetailStoreID as retailstoreid","WorkstationID as workstationid" ,"SequenceNumber as sequencenumber","BusinessDayDate as businessdaydate","OperatorName as operatorname", "LineItemSequenceNumber as lineitemsequencenumber","POSIDType as posidtype","POSItemID as positemid","Description as description","TaxAmount as taxamount","RegularSalesUnitPrice as regularsalesunitprice","ExtendedAmount as extendedamount","DiscountAmount as discountamount","ExtendedDiscountAmount as extendeddiscountamount","Quantity as quantity") mykuduContext.upsertRows(c, "sales_lines_tenny") } -
tsusanto created this gist
Feb 17, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,35 @@ /** * Created by tsusanto on 2/15/2017. spark-submit --master yarn-client --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar spark-submit --master yarn-cluster --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar */ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.kudu.spark.kudu._ import org.apache.spark.sql.functions._ object sparkXMLdemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("spark-xml-kudu") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val newData = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").option("samplingRatio","1").load("/user/tsusanto/POSLog-201409300635-21.xml") val flattened = newData.withColumn("LineItem", explode($"RetailTransaction.LineItem")) val selectedData = flattened.select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber" as "LineItemSequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.DiscountAmount", $"LineItem.Sale.ExtendedDiscountAmount", $"LineItem.Sale.Quantity") val kuduContext = new KuduContext("sjc-prd-hddn52.sc5.coupons.lan:7051") val df = sqlContext.read.options(Map("kudu.master" -> "sjc-prd-hddn52.sc5.coupons.lan:7051","kudu.table" -> "sales_lines_tenny")).kudu selectedData.registerTempTable("selectedData") val b = sqlContext.sql("SELECT CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string), '-', cast(LineItemSequenceNumber as string)), CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string)), RetailStoreID,WorkstationID,SequenceNumber,BusinessDayDate,OperatorName,LineItemSequenceNumber,POSIDType,cast(POSItemID as String),Description,TaxAmount,RegularSalesUnitPrice,ExtendedAmount,DiscountAmount,ExtendedDiscountAmount,Quantity FROM selectedData") val c = b.selectExpr("_c0 as lineid", "_c1 as transactionid", "RetailStoreID as retailstoreid","WorkstationID as workstationid" ,"SequenceNumber as sequencenumber","BusinessDayDate as businessdaydate","OperatorName as operatorname", "LineItemSequenceNumber as lineitemsequencenumber","POSIDType as posidtype","POSItemID as positemid","Description as description","TaxAmount as taxamount","RegularSalesUnitPrice as regularsalesunitprice","ExtendedAmount as extendedamount","DiscountAmount as discountamount","ExtendedDiscountAmount as extendeddiscountamount","Quantity as quantity") val mykuduContext = new KuduContext("sjc-prd-hddn52.sc5.coupons.lan:7051") mykuduContext.upsertRows(c, "sales_lines_tenny") } }