Skip to content

Instantly share code, notes, and snippets.

@tsusanto
Created May 1, 2017 18:55
Show Gist options
  • Select an option

  • Save tsusanto/d9e9c9ff06cffa355710c00c9ca360f9 to your computer and use it in GitHub Desktop.

Select an option

Save tsusanto/d9e9c9ff06cffa355710c00c9ca360f9 to your computer and use it in GitHub Desktop.

Revisions

  1. tsusanto created this gist May 1, 2017.
    46 changes: 46 additions & 0 deletions SparkStreamingKafka010
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,46 @@
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

    /**
    * Consumes messages from one or more topics in Kafka and does wordcount.
    * export SPARK_KAFKA_VERSION=0.10
    * spark2-submit --files jaas.conf --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --class Spark2Kafka Spark2Kafka-assembly-1.0.jar
    */
    object Spark2Kafka {
    def main(args: Array[String]) {


    val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "broker1:9092,broker26:9092,broker3:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "security.protocol" -> "SASL_PLAINTEXT",
    "sasl.mechanism" -> "PLAIN",
    "value.deserializer" -> classOf[StringDeserializer],
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val sparkConf = new SparkConf().setAppName("Spark2_KafkaSASL")

    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val topics = Array("poslog")
    val stream = KafkaUtils.createDirectStream[String, String](
    streamingContext,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
    )

    // Get the lines, split them into words, count the words and print
    stream.map(record => (record.key, record.value))
    stream.print()

    // Start the computation
    streamingContext.start()
    streamingContext.awaitTermination()
    }
    }