Last active
November 11, 2017 03:23
-
-
Save stdatalabs/d55a27202756114e75eab7921769a82d to your computer and use it in GitHub Desktop.
Revisions
-
stdatalabs revised this gist
Oct 24, 2016 . No changes.There are no files selected for viewing
-
stdatalabs revised this gist
Oct 24, 2016 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -13,7 +13,7 @@ import org.apache.spark.streaming.flume._ * * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n> * <comsumerKey> - Twitter consumer key * <consumerSecret> - Twitter consumer secret * <accessToken> - Twitter access token * <accessTokenSecret> - Twitter access token secret * <keyword_1> - The keyword to filter tweets -
stdatalabs revised this gist
Oct 24, 2016 . 1 changed file with 6 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -12,12 +12,12 @@ import org.apache.spark.streaming.flume._ * keywords from twitter datasource and find the popular hashtags * * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n> * <comsumerKey> - Twitter consumer key * <consumerSecret> - Twitter consumer secret * <accessToken> - Twitter access token * <accessTokenSecret> - Twitter access token secret * <keyword_1> - The keyword to filter tweets * <keyword_n> - Any number of keywords to filter tweets * * More discussion at stdatalabs.blogspot.com * -
stdatalabs revised this gist
Oct 24, 2016 . 1 changed file with 6 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -12,12 +12,12 @@ import org.apache.spark.streaming.flume._ * keywords from twitter datasource and find the popular hashtags * * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n> * <comsumerKey> - Twitter consumer key * <consumerSecret> - Twitter consumer secret * <accessToken> - Twitter access token * <accessTokenSecret> - Twitter access token secret * <keyword_1> - The keyword to filter tweets * <keyword_n> - Any number of keywords to filter tweets * * More discussion at stdatalabs.blogspot.com * -
stdatalabs revised this gist
Oct 24, 2016 . 1 changed file with 17 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -7,6 +7,23 @@ import org.apache.spark.{ SparkContext, SparkConf } import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume._ /** * A Spark Streaming application that receives tweets on certain * keywords from twitter datasource and find the popular hashtags * * Arguments: <comsumerKey> <consumerSecret> <accessToken> <accessTokenSecret> <keyword_1> ... <keyword_n> * <comsumerKey> - Twitter consumer key * <consumerSecret> - Twitter consumer secret * <accessToken> - Twitter access token * <accessTokenSecret> - Twitter access token secret * <keyword_1> - The keyword to filter tweets * <keyword_n> - Any number of keywords to filter tweets * * More discussion at stdatalabs.blogspot.com * * @author Sachin Thirumala */ object SparkPopularHashTags { val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags") val sc = new SparkContext(conf) -
stdatalabs renamed this gist
Oct 4, 2016 . 1 changed file with 2 additions and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -7,7 +7,7 @@ import org.apache.spark.{ SparkContext, SparkConf } import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume._ object SparkPopularHashTags { val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags") val sc = new SparkContext(conf) @@ -63,4 +63,4 @@ object PopularHashTags { ssc.start() ssc.awaitTermination() } } -
stdatalabs revised this gist
Sep 29, 2016 . 2 changed files with 66 additions and 57 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,66 @@ import org.apache.spark.streaming.{ Seconds, StreamingContext } import org.apache.spark.SparkContext._ import org.apache.spark.streaming.twitter._ import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.{ SparkContext, SparkConf } import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume._ object PopularHashTags { val conf = new SparkConf().setMaster("local[4]").setAppName("Spark Streaming - PopularHashTags") val sc = new SparkContext(conf) def main(args: Array[String]) { sc.setLogLevel("WARN") val Array(consumerKey, consumerSecret, accessToken, accessTokenSecret) = args.take(4) val filters = args.takeRight(args.length - 4) // Set the system properties so that Twitter4j library used by twitter stream // can use them to generat OAuth credentials System.setProperty("twitter4j.oauth.consumerKey", consumerKey) System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) System.setProperty("twitter4j.oauth.accessToken", accessToken) System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret) // Set the Spark StreamingContext to create a DStream for every 5 seconds val ssc = new StreamingContext(sc, Seconds(5)) // Pass the filter keywords as arguements // val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt) val stream = TwitterUtils.createStream(ssc, None, filters) // Split the stream on space and extract hashtags val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) // Get the top hashtags over the previous 60 sec window val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) .map { case (topic, count) => (count, topic) } .transform(_.sortByKey(false)) // Get the top hashtags over the previous 10 sec window val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) .map { case (topic, count) => (count, topic) } .transform(_.sortByKey(false)) // print tweets in the currect DStream stream.print() // Print popular hashtags topCounts60.foreachRDD(rdd => { val topList = rdd.take(10) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) } }) topCounts10.foreachRDD(rdd => { val topList = rdd.take(10) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) } }) ssc.start() ssc.awaitTermination() } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,57 +0,0 @@ -
stdatalabs renamed this gist
Sep 28, 2016 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
stdatalabs created this gist
Sep 28, 2016 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,57 @@ import org.apache.spark.streaming.{ Seconds, StreamingContext } import org.apache.spark.SparkContext._ import org.apache.spark.streaming.twitter._ import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.{ SparkContext, SparkConf } import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.flume._ object PopularHashTags { val conf = new SparkConf().setMaster("local[4]").setAppName("PopularHashTags") val sc = new SparkContext(conf) def main(args: Array[String]) { sc.setLogLevel("WARN") System.setProperty("twitter4j.oauth.consumerKey", "<-- paste consumer key here -->") System.setProperty("twitter4j.oauth.consumerSecret", "<-- paste consumer secret here -->") System.setProperty("twitter4j.oauth.accessToken", "<-- paste access token here -->") System.setProperty("twitter4j.oauth.accessTokenSecret", "<-- paste access token here -->") // Set the Spark StreamingContext to create a DStream for every 5 seconds val ssc = new StreamingContext(sc, Seconds(5)) // Pass the filter keywords as arguements val filter = args.takeRight(args.length) // val stream = FlumeUtils.createStream(ssc, args(0), args(1).toInt) val stream = TwitterUtils.createStream(ssc,None, filter) // Split the stream on space and extract hashtags val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) // Get the top hashtags over the previous 60 sec window val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) .map{case (topic, count) => (count, topic)} .transform(_.sortByKey(false)) // Get the top hashtags over the previous 10 sec window val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) .map{case (topic, count) => (count, topic)} .transform(_.sortByKey(false)) // print tweets in the currect DStream stream.print() // Print popular hashtags topCounts60.foreachRDD(rdd => { val topList = rdd.take(10) println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) topCounts10.foreachRDD(rdd => { val topList = rdd.take(10) println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) ssc.start() ssc.awaitTermination() } }