Created
November 9, 2021 18:31
-
-
Save OneCricketeer/ec5c95023799a808c72a06a375eb439b to your computer and use it in GitHub Desktop.
Revisions
-
OneCricketeer created this gist
Nov 9, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,60 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy import org.apache.spark.sql import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.OutputMode import org.slf4j.LoggerFactory object KafkaMirror extends App { val logger = LoggerFactory.getLogger(getClass) /** * For testing output to a console. * * @param df A Streaming DataFrame * @return A DataStreamWriter */ private def streamToConsole(df: sql.DataFrame) = { df.writeStream.outputMode(OutputMode.Append()).format("console") } private def getKafkaDf(spark: SparkSession, bootstrap: String, topicPattern: String, offsetResetStrategy: OffsetResetStrategy = OffsetResetStrategy.EARLIEST) = { spark.readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrap) .option("subscribe", topicPattern) .option("startingOffsets", offsetResetStrategy.toString.toLowerCase()) .load() } private def streamToKafka(df: sql.DataFrame, bootstrap: String, checkpointLocation: String) = { val cols = df.columns if (!cols.contains("topic")) { val e = new IllegalArgumentException(s"""Dataframe columns must contain 'topic'. Existing cols=[${cols.mkString(", ")}]""") logger.error("Unable to stream dataframe to Kafka", cols, e.asInstanceOf[Any]) throw e } // output topic comes from dataframe, not from options df.writeStream .format("kafka") .option("kafka.bootstrap.servers", bootstrap) .option("checkpointLocation", checkpointLocation) // required option } val spark = SparkSession.builder() .appName("Kafka Test") .master("local[*]") // TODO: real cluster .getOrCreate() import spark.implicits._ val kafkaBootstrap = "localhost:9092" // TODO: real cluster val df = getKafkaDf(spark, kafkaBootstrap, "input-topic") streamToKafka( // Stream topic values into the same partition from input-topic to output-topic df.select($"partition", $"value", regexp_replace($"topic", "^input", "output").as("topic")), kafkaBootstrap, checkpointLocation = "/tmp/spark-sql-kafka" // TODO: real persistence, and not local disk ).start().awaitTermination() }