Skip to content

Instantly share code, notes, and snippets.

@hardkap
Created January 17, 2017 06:15
Show Gist options
  • Select an option

  • Save hardkap/03d0a2befe77fd1a30d91b8e1f4e4319 to your computer and use it in GitHub Desktop.

Select an option

Save hardkap/03d0a2befe77fd1a30d91b8e1f4e4319 to your computer and use it in GitHub Desktop.

Revisions

  1. hardkap created this gist Jan 17, 2017.
    110 changes: 110 additions & 0 deletions KafkaIndexed.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,110 @@
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD
    import edu.berkeley.cs.amplab.spark.indexedrdd.IndexedRDD._
    import org.apache.log4j.Logger
    import org.apache.log4j.Level

    /** KafkaIndexed - Find the bigrams from Log data coming through Kafka broker */
    object KafkaIndexed {

    def main(args: Array[String]): Unit = {

    // Suppress some of the log messages for seeing test results easily
    Logger.getLogger("org").setLevel(Level.ERROR)
    Logger.getLogger("akka").setLevel(Level.ERROR)

    // Setting up the spark streaming context
    val sparkConf = new SparkConf().setAppName("KafkaIndexed").setMaster("local[*]")
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val ssc = new StreamingContext(sc, Seconds(1))

    // Read the solr data and cache it as Indexed RDD (https://github.com/amplab/spark-indexedrdd)
    val options = Map("zkHost" -> "localhost:2181","collection" -> "biwords","fields" -> "biwords,id")
    val rdd_biwords = sqlContext.read.format("solr").options(options).load().rdd.map(r => (r.getString(0),r.getString(1)))
    val indexed = IndexedRDD(rdd_biwords).cache()

    // Set up Kafka streaming receiver
    val topicMap = "test".split(",").map((_, 1)).toMap //hardcoding the kafka topic for testing
    val lines = KafkaUtils.createStream(ssc, "localhost:2182", "group1", topicMap).map(_._2)

    // Looping through the data received from the input stream
    lines.foreachRDD(rdd => if(!rdd.partitions.isEmpty) {
    val nrdd = rdd.map{
    // Split each line into substrings by periods
    _.split('.').map{ substrings =>
    // Trim substrings and then tokenize on spaces
    substrings.trim.split(' ').
    // Remove non-alphanumeric characters and convert to lowercase
    map{_.replaceAll("""\W""", "").toLowerCase()}.
    // Find bigrams
    sliding(2)
    }.
    // Flatten, and map the bigrams to concatenated strings
    flatMap{identity}.map{_.mkString(" ")}.
    // Group the bigrams and count their frequency
    groupBy{identity}.mapValues{_.size}
    }.
    // Reduce to get a global count, then collect
    flatMap{identity}.reduceByKey(_+_)

    nrdd.join(indexed).map{ case (a,(b,c)) => (a,b)}.foreach(println)

    })

    // Start the streaming context
    ssc.start()
    ssc.awaitTermination()

    }

    }

    /**
    To Run a test:
    ==============
    1 - Download and install dependencies:
    a) Download the following jar
    https://github.com/ankurdave/maven-repo/blob/master/com/ankurdave/part_2.10/0.2/part_2.10-0.2.jar
    b) Download the IndexedRdd jar
    http://dl.bintray.com/spark-packages/maven/amplab/spark-indexedrdd/0.4.0/spark-indexedrdd-0.4.0.jar
    c) Download and build spark solr to create the jar (due to dependency issues)
    https://github.com/lucidworks/spark-solr
    2 - Build the program using the build.sbt provided
    sbt package
    3 - In the shell, run:
    spark-submit --class KafkaIndexed \
    --jars library/spark-solr-3.0.1-SNAPSHOT-shaded.jar, \
    library/amplab_spark-indexedrdd-0.4.0.jar, \
    library/spark-streaming-kafka-0-8-assembly_2.11-2.0.1.jar, \
    library/part_2.10-0.2.jar \
    target/scala-2.11/kafkatest_2.11-1.0.jar
    4 - In another shell, run the following (assuming KAFKA is installed and $KAFKA_HOME is set and a topic 'test' is already created)
    $KAFKA_HOME/bin/kafka-console-producer.sh --topic=test --broker-list=172.17.7.7:32771,172.17.7.7:32772
    5 - In the Kafka producer prompt, type text like below:
    hello there how are you doing today
    write something that does not match any biwords in the solr db
    6 - View the outputs in the first shell where the Spark streaming application is running
    Assumptions:
    ============
    1 - Solr cloud is setup and running connected to Zookeeper at port 2181
    2 - Solr has a collection called 'biwords' with 2 fields (id and biwords).
    3 - In the Solr collection, 'Multivalued' should be set to false for the biwords field.
    4 - Kafka nodes are running connected to another Zookeeper at port 2182
    5 - Kafka topic 'test' is created
    */
    22 changes: 22 additions & 0 deletions build.sbt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,22 @@
    name := "kafkatest"

    version := "1.0"

    scalaVersion := "2.11.8"

    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "2.0.1" % "provided",
    "org.apache.spark" %% "spark-streaming" % "2.0.1" % "provided",
    "org.apache.spark" %% "spark-sql" % "2.0.1",
    "org.apache.spark" %% "spark-mllib" % "2.0.1",
    "org.apache.spark" %% "spark-streaming-kafka-0-8" % "2.0.1",
    "org.apache.spark" %% "spark-mllib" % "2.0.1"
    )

    // META-INF discarding
    mergeStrategy in assembly <<= (mergeStrategy in assembly) { (old) =>
    {
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
    }
    }