Skip to content

Instantly share code, notes, and snippets.

@OneCricketeer
Created November 9, 2021 18:31
Show Gist options
  • Select an option

  • Save OneCricketeer/ec5c95023799a808c72a06a375eb439b to your computer and use it in GitHub Desktop.

Select an option

Save OneCricketeer/ec5c95023799a808c72a06a375eb439b to your computer and use it in GitHub Desktop.

Revisions

  1. OneCricketeer created this gist Nov 9, 2021.
    60 changes: 60 additions & 0 deletions KafkaMirror.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,60 @@
    import org.apache.kafka.clients.consumer.OffsetResetStrategy
    import org.apache.spark.sql
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.streaming.OutputMode
    import org.slf4j.LoggerFactory

    object KafkaMirror extends App {

    val logger = LoggerFactory.getLogger(getClass)

    /**
    * For testing output to a console.
    *
    * @param df A Streaming DataFrame
    * @return A DataStreamWriter
    */
    private def streamToConsole(df: sql.DataFrame) = {
    df.writeStream.outputMode(OutputMode.Append()).format("console")
    }

    private def getKafkaDf(spark: SparkSession, bootstrap: String, topicPattern: String, offsetResetStrategy: OffsetResetStrategy = OffsetResetStrategy.EARLIEST) = {
    spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrap)
    .option("subscribe", topicPattern)
    .option("startingOffsets", offsetResetStrategy.toString.toLowerCase())
    .load()
    }

    private def streamToKafka(df: sql.DataFrame, bootstrap: String, checkpointLocation: String) = {
    val cols = df.columns
    if (!cols.contains("topic")) {
    val e = new IllegalArgumentException(s"""Dataframe columns must contain 'topic'. Existing cols=[${cols.mkString(", ")}]""")
    logger.error("Unable to stream dataframe to Kafka", cols, e.asInstanceOf[Any])
    throw e
    }
    // output topic comes from dataframe, not from options
    df.writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrap)
    .option("checkpointLocation", checkpointLocation) // required option
    }

    val spark = SparkSession.builder()
    .appName("Kafka Test")
    .master("local[*]") // TODO: real cluster
    .getOrCreate()
    import spark.implicits._

    val kafkaBootstrap = "localhost:9092" // TODO: real cluster

    val df = getKafkaDf(spark, kafkaBootstrap, "input-topic")
    streamToKafka(
    // Stream topic values into the same partition from input-topic to output-topic
    df.select($"partition", $"value", regexp_replace($"topic", "^input", "output").as("topic")),
    kafkaBootstrap,
    checkpointLocation = "/tmp/spark-sql-kafka" // TODO: real persistence, and not local disk
    ).start().awaitTermination()
    }