package igorprivate; import shaded.parquet.org.slf4j.LoggerFactory; import static org.apache.spark.sql.functions.concat_ws; import static org.apache.spark.sql.functions.date_format; import static org.apache.spark.sql.functions.from_json; import static org.apache.spark.sql.functions.lit; import static org.apache.spark.sql.functions.struct; import static org.apache.spark.sql.functions.sum; import static org.apache.spark.sql.functions.to_json; import static org.apache.spark.sql.functions.when; import static org.apache.spark.sql.functions.window; import java.util.HashMap; import java.util.concurrent.TimeUnit; import org.apache.log4j.Level; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.Trigger; import org.apache.spark.sql.types.StructType; public class SparkStructuredStreaminglMainExample { public static void main(String[] args) throws Exception { org.apache.log4j.Logger.getRootLogger().setLevel(Level.INFO); SparkConf properties = new SparkConf(); properties.set("spark.sql.session.timeZone", "UTC"); properties.set("spark.default.parallelism", String.valueOf(8)); properties.set("spark.sql.shuffle.partitions", String.valueOf(8)); String master = System.getProperty("spark.master", "local["+Runtime.getRuntime().availableProcessors()+"]"); try (SparkSession spark = SparkSession.builder().config(properties).master(master).appName("my-awesome-spark").getOrCreate()) { spark.sparkContext().setLogLevel("INFO"); Dataset df = spark.readStream().format("kafka") .option("subscribe", "raw_sla_reports") .option("kafka.bootstrap.servers", "0.0.0.0:29092") .option("groupidprefix", "my-awesome-group-spark-streaming-dev") .load(); Dataset rowDataset = df.selectExpr("CAST(value AS STRING)"); StructType schema = StructType.fromDDL("msgId string, sourceLabels map, data array, value: INT>>"); Dataset jsonDF = rowDataset.select(from_json(rowDataset.col("value"), schema, new HashMap<>()).as("r")); jsonDF = jsonDF.select("r.*"); Dataset raw_sla_reports_exploded_stream = jsonDF.selectExpr("msgId", "sourceLabels", "EXPLODE(data) as data"); Dataset d = raw_sla_reports_exploded_stream.selectExpr( "msgId", "sourceLabels['sourceGroup'] as sourceGroup", "cast(data.logicalTime/1000 as timestamp) as logicalTime", "cast(data.processingTime/1000 as timestamp) as processingTime", "data.processingTime - data.logicalTime as delay", "data.value as value", "data.labels['msgtype'] as msgtype") .withWatermark("logicalTime", "24 hour"); d = d.dropDuplicates("msgId", "logicalTime"); //important to have logicalTime inside Dataset histo = d.groupBy( window(d.col("logicalTime"), "60 seconds"), d.col("sourceGroup"), d.col("msgtype") ).agg( sum(when(d.col("delay").leq(lit("60000")), d.col("value")).otherwise(lit(0))).alias("zero_to_one_min"), sum(when(d.col("delay").gt(lit("60000")) .and(d.col("delay").leq(lit("300000"))), d.col("value")).otherwise(lit(0))).alias("one_to_five_mins"), sum(when(d.col("delay").gt(lit("300000")) .and(d.col("delay").leq(lit("600000"))), d.col("value")).otherwise(lit(0))).alias("five_to_ten_mins"), sum(when(d.col("delay").gt(lit("600000")) .and(d.col("delay").leq(lit("1800000"))), d.col("value")).otherwise(lit(0))).alias("ten_to_thirty_mins"), sum(when(d.col("delay").gt(lit("1800000")), d.col("value")).otherwise(lit(0))).alias("more_than_thirty_mins"), sum("value").alias("total_protos") ); Dataset histo_projected = histo.selectExpr( "sourceGroup", "msgtype", "window", "zero_to_one_min", "one_to_five_mins", "five_to_ten_mins", "ten_to_thirty_mins", "more_than_thirty_mins", "total_protos" ).withColumn("window_start", date_format(histo.col("window.start"), "yyyy-MM-dd-HH-mm-ss")).drop("window"); //histo_projected.printSchema(); Dataset histo_projected_with_agg_key = histo_projected.withColumn("aggKey", concat_ws("_", histo_projected.col("sourceGroup"), histo_projected.col("msgtype"), histo_projected.col("window_start"))); //histo_projected_with_agg_key.printSchema(); Dataset forOutputTopic = histo_projected_with_agg_key.select( to_json(struct(histo_projected_with_agg_key.col("*"))).alias("value") ); //value must be for kafka output //forOutputTopic.printSchema(); StreamingQuery streamingQuery = forOutputTopic.writeStream().format("kafka") .option("kafka.bootstrap.servers", "0.0.0.0:29092") .option("topic", "delays_per_group_histo_table") .option("checkpointLocation", "/tmp/checkpoint") .outputMode(OutputMode.Update()) .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) .start(); streamingQuery.awaitTermination(); } } }