Created
May 1, 2017 18:55
-
-
Save tsusanto/d9e9c9ff06cffa355710c00c9ca360f9 to your computer and use it in GitHub Desktop.
Revisions
-
tsusanto created this gist
May 1, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,46 @@ import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe /** * Consumes messages from one or more topics in Kafka and does wordcount. * export SPARK_KAFKA_VERSION=0.10 * spark2-submit --files jaas.conf --driver-java-options "-Djava.security.auth.login.config=./jaas.conf" --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./jaas.conf" --class Spark2Kafka Spark2Kafka-assembly-1.0.jar */ object Spark2Kafka { def main(args: Array[String]) { val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "broker1:9092,broker26:9092,broker3:9092", "key.deserializer" -> classOf[StringDeserializer], "security.protocol" -> "SASL_PLAINTEXT", "sasl.mechanism" -> "PLAIN", "value.deserializer" -> classOf[StringDeserializer], "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val sparkConf = new SparkConf().setAppName("Spark2_KafkaSASL") val streamingContext = new StreamingContext(sparkConf, Seconds(5)) val topics = Array("poslog") val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) // Get the lines, split them into words, count the words and print stream.map(record => (record.key, record.value)) stream.print() // Start the computation streamingContext.start() streamingContext.awaitTermination() } }