/** * Created by tsusanto on 2/15/2017. spark-submit --master yarn-client --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar spark-submit --master yarn-cluster --class sparkXMLdemo SparkXmlKudu-assembly-1.0.jar */ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.kudu.spark.kudu._ import org.apache.spark.sql.functions._ object sparkXMLdemo { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("spark-xml-kudu") val sc = new SparkContext(conf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val newData = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Transaction").option("samplingRatio","1").load("/user/tsusanto/POSLog-201409300635-21.xml") val flattened = newData.withColumn("LineItem", explode($"RetailTransaction.LineItem")) val selectedData = flattened.select($"RetailStoreID",$"WorkstationID", $"SequenceNumber",$"BusinessDayDate",$"OperatorID._OperatorName" as "OperatorName",$"OperatorID._VALUE" as "OperatorID",$"CurrencyCode",$"RetailTransaction.ReceiptDateTime",$"RetailTransaction.TransactionCount",$"LineItem.SequenceNumber" as "LineItemSequenceNumber",$"LineItem.Tax.TaxableAmount", $"LineItem.Tax.Amount" as "TaxAmount",$"LineItem.Tax.Percent" as "TaxPercent",$"LineItem.Sale.POSIdentity._POSIDType" as "POSIDType",$"LineItem.Sale.POSIdentity.POSItemID" as "POSItemID" ,$"LineItem.Sale.Description",$"LineItem.Sale.RegularSalesUnitPrice", $"LineItem.Sale.ExtendedAmount", $"LineItem.Sale.DiscountAmount", $"LineItem.Sale.ExtendedDiscountAmount", $"LineItem.Sale.Quantity") val kuduContext = new KuduContext("your-kudu-master-server:7051") val df = sqlContext.read.options(Map("kudu.master" -> "your-kudu-master-server:7051","kudu.table" -> "sales_lines_tenny")).kudu selectedData.registerTempTable("selectedData") val b = sqlContext.sql("SELECT CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string), '-', cast(LineItemSequenceNumber as string)), CONCAT(BusinessDayDate, '-', cast(RetailStoreID as string), '-',cast(WorkstationID as string),'-', cast(SequenceNumber as string)), RetailStoreID,WorkstationID,SequenceNumber,BusinessDayDate,OperatorName,LineItemSequenceNumber,POSIDType,cast(POSItemID as String),Description,TaxAmount,RegularSalesUnitPrice,ExtendedAmount,DiscountAmount,ExtendedDiscountAmount,Quantity FROM selectedData") val c = b.selectExpr("_c0 as lineid", "_c1 as transactionid", "RetailStoreID as retailstoreid","WorkstationID as workstationid" ,"SequenceNumber as sequencenumber","BusinessDayDate as businessdaydate","OperatorName as operatorname", "LineItemSequenceNumber as lineitemsequencenumber","POSIDType as posidtype","POSItemID as positemid","Description as description","TaxAmount as taxamount","RegularSalesUnitPrice as regularsalesunitprice","ExtendedAmount as extendedamount","DiscountAmount as discountamount","ExtendedDiscountAmount as extendeddiscountamount","Quantity as quantity") mykuduContext.upsertRows(c, "sales_lines_tenny") } }