Created
January 17, 2017 06:15
-
-
Save hardkap/03d0a2befe77fd1a30d91b8e1f4e4319 to your computer and use it in GitHub Desktop.
Revisions
-
hardkap created this gist
Jan 17, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,110 @@ import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.{Seconds, StreamingContext} import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._ import org.apache.log4j.Logger import org.apache.log4j.Level /** KafkaIndexed - Find the bigrams from Log data coming through Kafka broker */ object KafkaIndexed { def main(args: Array[String]): Unit = { // Suppress some of the log messages for seeing test results easily Logger.getLogger("org").setLevel(Level.ERROR) Logger.getLogger("akka").setLevel(Level.ERROR) // Setting up the spark streaming context val sparkConf = new SparkConf().setAppName("KafkaIndexed").setMaster("local[*]") sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true") val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val ssc = new StreamingContext(sc, Seconds(1)) // Read the solr data and cache it as Indexed RDD (https://github.com/amplab/spark-indexedrdd) val options = Map("zkHost" -> "localhost:2181","collection" -> "biwords","fields" -> "biwords,id") val rdd_biwords = sqlContext.read.format("solr").options(options).load().rdd.map(r => (r.getString(0),r.getString(1))) val indexed = IndexedRDD(rdd_biwords).cache() // Set up Kafka streaming receiver val topicMap = "test".split(",").map((_, 1)).toMap //hardcoding the kafka topic for testing val lines = KafkaUtils.createStream(ssc, "localhost:2182", "group1", topicMap).map(_._2) // Looping through the data received from the input stream lines.foreachRDD(rdd => if(!rdd.partitions.isEmpty) { val nrdd = rdd.map{ // Split each line into substrings by periods _.split('.').map{ substrings => // Trim substrings and then tokenize on spaces substrings.trim.split(' '). // Remove non-alphanumeric characters and convert to lowercase map{_.replaceAll("""\W""", "").toLowerCase()}. // Find bigrams sliding(2) }. // Flatten, and map the bigrams to concatenated strings flatMap{identity}.map{_.mkString(" ")}. // Group the bigrams and count their frequency groupBy{identity}.mapValues{_.size} }. // Reduce to get a global count, then collect flatMap{identity}.reduceByKey(_+_) nrdd.join(indexed).map{ case (a,(b,c)) => (a,b)}.foreach(println) }) // Start the streaming context ssc.start() ssc.awaitTermination() } } /** To Run a test: ============== 1 - Download and install dependencies: a) Download the following jar https://github.com/ankurdave/maven-repo/blob/master/com/ankurdave/part_2.10/0.2/part_2.10-0.2.jar b) Download the IndexedRdd jar http://dl.bintray.com/spark-packages/maven/amplab/spark-indexedrdd/0.4.0/spark-indexedrdd-0.4.0.jar c) Download and build spark solr to create the jar (due to dependency issues) https://github.com/lucidworks/spark-solr 2 - Build the program using the build.sbt provided sbt package 3 - In the shell, run: spark-submit --class KafkaIndexed \ --jars library/spark-solr-3.0.1-SNAPSHOT-shaded.jar, \ library/amplab_spark-indexedrdd-0.4.0.jar, \ library/spark-streaming-kafka-0-8-assembly_2.11-2.0.1.jar, \ library/part_2.10-0.2.jar \ target/scala-2.11/kafkatest_2.11-1.0.jar 4 - In another shell, run the following (assuming KAFKA is installed and $KAFKA_HOME is set and a topic 'test' is already created) $KAFKA_HOME/bin/kafka-console-producer.sh --topic=test --broker-list=172.17.7.7:32771,172.17.7.7:32772 5 - In the Kafka producer prompt, type text like below: hello there how are you doing today write something that does not match any biwords in the solr db 6 - View the outputs in the first shell where the Spark streaming application is running Assumptions: ============ 1 - Solr cloud is setup and running connected to Zookeeper at port 2181 2 - Solr has a collection called 'biwords' with 2 fields (id and biwords). 3 - In the Solr collection, 'Multivalued' should be set to false for the biwords field. 4 - Kafka nodes are running connected to another Zookeeper at port 2182 5 - Kafka topic 'test' is created */ This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,22 @@ name := "kafkatest" version := "1.0" scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.0.1" % "provided", "org.apache.spark" %% "spark-streaming" % "2.0.1" % "provided", "org.apache.spark" %% "spark-sql" % "2.0.1", "org.apache.spark" %% "spark-mllib" % "2.0.1", "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.1", "org.apache.spark" %% "spark-mllib" % "2.0.1" ) // META-INF discarding mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) => { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case x => MergeStrategy.first } }