Skip to content

Instantly share code, notes, and snippets.

@stdatalabs
Last active November 11, 2017 03:23
Show Gist options
  • Select an option

  • Save stdatalabs/d55a27202756114e75eab7921769a82d to your computer and use it in GitHub Desktop.

Select an option

Save stdatalabs/d55a27202756114e75eab7921769a82d to your computer and use it in GitHub Desktop.

Revisions

  1. stdatalabs revised this gist Oct 24, 2016. No changes.
  2. stdatalabs revised this gist Oct 24, 2016. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion SparkPopularHashTags.scala
    Original file line number Diff line number Diff line change
    @@ -13,7 +13,7 @@ import org.apache.spark.streaming.flume._
    *
    * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n>
    * <comsumerKey> - Twitter consumer key
    * <consumerSecret> - Twitter consumer secret
    * <consumerSecret> - Twitter consumer secret
    * <accessToken> - Twitter access token
    * <accessTokenSecret> - Twitter access token secret
    * <keyword_1> - The keyword to filter tweets
  3. stdatalabs revised this gist Oct 24, 2016. 1 changed file with 6 additions and 6 deletions.
    12 changes: 6 additions & 6 deletions SparkPopularHashTags.scala
    Original file line number Diff line number Diff line change
    @@ -12,12 +12,12 @@ import org.apache.spark.streaming.flume._
    * keywords from twitter datasource and find the popular hashtags
    *
    * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n>
    * <comsumerKey> - Twitter consumer key
    * <consumerSecret> - Twitter consumer secret
    * <accessToken> - Twitter access token
    * <accessTokenSecret> - Twitter access token secret
    * <keyword_1> - The keyword to filter tweets
    * <keyword_n> - Any number of keywords to filter tweets
    * <comsumerKey> - Twitter consumer key
    * <consumerSecret> - Twitter consumer secret
    * <accessToken> - Twitter access token
    * <accessTokenSecret> - Twitter access token secret
    * <keyword_1> - The keyword to filter tweets
    * <keyword_n> - Any number of keywords to filter tweets
    *
    * More discussion at stdatalabs.blogspot.com
    *
  4. stdatalabs revised this gist Oct 24, 2016. 1 changed file with 6 additions and 6 deletions.
    12 changes: 6 additions & 6 deletions SparkPopularHashTags.scala
    Original file line number Diff line number Diff line change
    @@ -12,12 +12,12 @@ import org.apache.spark.streaming.flume._
    * keywords from twitter datasource and find the popular hashtags
    *
    * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n>
    * <comsumerKey> - Twitter consumer key
    * <consumerSecret> - Twitter consumer secret
    * <accessToken> - Twitter access token
    * <accessTokenSecret> - Twitter access token secret
    * <keyword_1> - The keyword to filter tweets
    * <keyword_n> - Any number of keywords to filter tweets
    * <comsumerKey> - Twitter consumer key
    * <consumerSecret> - Twitter consumer secret
    * <accessToken> - Twitter access token
    * <accessTokenSecret> - Twitter access token secret
    * <keyword_1> - The keyword to filter tweets
    * <keyword_n> - Any number of keywords to filter tweets
    *
    * More discussion at stdatalabs.blogspot.com
    *
  5. stdatalabs revised this gist Oct 24, 2016. 1 changed file with 17 additions and 0 deletions.
    17 changes: 17 additions & 0 deletions SparkPopularHashTags.scala
    Original file line number Diff line number Diff line change
    @@ -7,6 +7,23 @@ import org.apache.spark.{ SparkContext, SparkConf }
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.flume._

    /**
    * A Spark Streaming application that receives tweets on certain
    * keywords from twitter datasource and find the popular hashtags
    *
    * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n>
    * <comsumerKey> - Twitter consumer key
    * <consumerSecret> - Twitter consumer secret
    * <accessToken> - Twitter access token
    * <accessTokenSecret> - Twitter access token secret
    * <keyword_1> - The keyword to filter tweets
    * <keyword_n> - Any number of keywords to filter tweets
    *
    * More discussion at stdatalabs.blogspot.com
    *
    * @author Sachin Thirumala
    */

    object SparkPopularHashTags {
    val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
    val sc = new SparkContext(conf)
  6. stdatalabs renamed this gist Oct 4, 2016. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions PopularHashTags.scala → SparkPopularHashTags.scala
    Original file line number Diff line number Diff line change
    @@ -7,7 +7,7 @@ import org.apache.spark.{ SparkContext, SparkConf }
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.flume._

    object PopularHashTags {
    object SparkPopularHashTags {
    val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
    val sc = new SparkContext(conf)

    @@ -63,4 +63,4 @@ object PopularHashTags {
    ssc.start()
    ssc.awaitTermination()
    }
    }
    }
  7. stdatalabs revised this gist Sep 29, 2016. 2 changed files with 66 additions and 57 deletions.
    66 changes: 66 additions & 0 deletions PopularHashTags.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,66 @@
    import org.apache.spark.streaming.{ Seconds, StreamingContext }
    import org.apache.spark.SparkContext._
    import org.apache.spark.streaming.twitter._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.{ SparkContext, SparkConf }
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.flume._

    object PopularHashTags {
    val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags")
    val sc = new SparkContext(conf)

    def main(args: Array[String]) {

    sc.setLogLevel("WARN")

    val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4)
    val filters = args.takeRight(args.length - 4)

    // Set the system properties so that Twitter4j library used by twitter stream
    // can use them to generat OAuth credentials
    System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

    // Set the Spark StreamingContext to create a DStream for every 5 seconds
    val ssc = new StreamingContext(sc, Seconds(5))
    // Pass the filter keywords as arguements

    // val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt)
    val stream = TwitterUtils.createStream(ssc, None, filters)

    // Split the stream on space and extract hashtags
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

    // Get the top hashtags over the previous 60 sec window
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
    .map { case (topic, count) => (count, topic) }
    .transform(_.sortByKey(false))

    // Get the top hashtags over the previous 10 sec window
    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
    .map { case (topic, count) => (count, topic) }
    .transform(_.sortByKey(false))

    // print tweets in the currect DStream
    stream.print()

    // Print popular hashtags
    topCounts60.foreachRDD(rdd => {
    val topList = rdd.take(10)
    println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
    topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })
    topCounts10.foreachRDD(rdd => {
    val topList = rdd.take(10)
    println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
    topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })

    ssc.start()
    ssc.awaitTermination()
    }
    }
    57 changes: 0 additions & 57 deletions TwitterPopularHashTags
    Original file line number Diff line number Diff line change
    @@ -1,57 +0,0 @@
    import org.apache.spark.streaming.{ Seconds, StreamingContext }
    import org.apache.spark.SparkContext._
    import org.apache.spark.streaming.twitter._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.{ SparkContext, SparkConf }
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.flume._

    object PopularHashTags {
    val conf = new SparkConf().setMaster("local[4]").setAppName("PopularHashTags")
    val sc = new SparkContext(conf)
    def main(args: Array[String]) {
    sc.setLogLevel("WARN")
    System.setProperty("twitter4j.oauth.consumerKey", "<-- paste consumer key here -->")
    System.setProperty("twitter4j.oauth.consumerSecret", "<-- paste consumer secret here -->")
    System.setProperty("twitter4j.oauth.accessToken", "<-- paste access token here -->")
    System.setProperty("twitter4j.oauth.accessTokenSecret", "<-- paste access token here -->")

    // Set the Spark StreamingContext to create a DStream for every 5 seconds
    val ssc = new StreamingContext(sc, Seconds(5))
    // Pass the filter keywords as arguements
    val filter = args.takeRight(args.length)
    // val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt)
    val stream = TwitterUtils.createStream(ssc,None, filter)
    // Split the stream on space and extract hashtags
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

    // Get the top hashtags over the previous 60 sec window
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
    .map{case (topic, count) => (count, topic)}
    .transform(_.sortByKey(false))

    // Get the top hashtags over the previous 10 sec window
    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
    .map{case (topic, count) => (count, topic)}
    .transform(_.sortByKey(false))

    // print tweets in the currect DStream
    stream.print()

    // Print popular hashtags
    topCounts60.foreachRDD(rdd => {
    val topList = rdd.take(10)
    println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
    topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
    })
    topCounts10.foreachRDD(rdd => {
    val topList = rdd.take(10)
    println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
    topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
    })

    ssc.start()
    ssc.awaitTermination()
    }
    }
  8. stdatalabs renamed this gist Sep 28, 2016. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  9. stdatalabs created this gist Sep 28, 2016.
    57 changes: 57 additions & 0 deletions gistfile1.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,57 @@
    import org.apache.spark.streaming.{ Seconds, StreamingContext }
    import org.apache.spark.SparkContext._
    import org.apache.spark.streaming.twitter._
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.{ SparkContext, SparkConf }
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.flume._

    object PopularHashTags {
    val conf = new SparkConf().setMaster("local[4]").setAppName("PopularHashTags")
    val sc = new SparkContext(conf)
    def main(args: Array[String]) {
    sc.setLogLevel("WARN")
    System.setProperty("twitter4j.oauth.consumerKey", "<-- paste consumer key here -->")
    System.setProperty("twitter4j.oauth.consumerSecret", "<-- paste consumer secret here -->")
    System.setProperty("twitter4j.oauth.accessToken", "<-- paste access token here -->")
    System.setProperty("twitter4j.oauth.accessTokenSecret", "<-- paste access token here -->")

    // Set the Spark StreamingContext to create a DStream for every 5 seconds
    val ssc = new StreamingContext(sc, Seconds(5))
    // Pass the filter keywords as arguements
    val filter = args.takeRight(args.length)
    // val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt)
    val stream = TwitterUtils.createStream(ssc,None, filter)
    // Split the stream on space and extract hashtags
    val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

    // Get the top hashtags over the previous 60 sec window
    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
    .map{case (topic, count) => (count, topic)}
    .transform(_.sortByKey(false))

    // Get the top hashtags over the previous 10 sec window
    val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
    .map{case (topic, count) => (count, topic)}
    .transform(_.sortByKey(false))

    // print tweets in the currect DStream
    stream.print()

    // Print popular hashtags
    topCounts60.foreachRDD(rdd => {
    val topList = rdd.take(10)
    println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
    topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
    })
    topCounts10.foreachRDD(rdd => {
    val topList = rdd.take(10)
    println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
    topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
    })

    ssc.start()
    ssc.awaitTermination()
    }
    }