Last active
July 15, 2017 22:29
-
-
Save juanmf/ef6445843c1ccad1ebce43df8567a66a to your computer and use it in GitHub Desktop.
Revisions
-
juanmf renamed this gist
Jul 15, 2017 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
juanmf created this gist
Jul 15, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,45 @@ package com.mycompany; /** * Created by juanmf on 15/07/17. */ import java.util.Arrays; import java.util.stream.Collectors; import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import scala.Tuple2; public class WordCount { public static void main(String[] args) { // create Spark context with Spark configuration try (JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"))) { int threshold = Integer.parseInt(args[1]); String filePath = args[0]; JavaPairRDD<String, Integer> filtered = filterWordsUnderThreshold(sc, threshold, filePath); // count characters JavaPairRDD<Character, Integer> charCounts = filtered .flatMap( s -> s._1.chars().mapToObj(i -> (char)i).collect(Collectors.toList())) .mapToPair(c -> new Tuple2<>(c, 1)) .reduceByKey((i1, i2) -> i1 + i2); System.out.println(charCounts.collect()); } } private static JavaPairRDD<String, Integer> filterWordsUnderThreshold(JavaSparkContext sc, int threshold, String filePath) { // read in text file and split each document into words JavaRDD<String> tokenized = sc.textFile(filePath).flatMap(s -> Arrays.asList(s.split(" "))); // count the occurrence of each word JavaPairRDD<String, Integer> counts = tokenized .mapToPair(s -> new Tuple2<>(s, 1)) .reduceByKey((i1, i2) -> i1 + i2); // filter out words with fewer than threshold occurrences return counts.filter(tup -> tup._2 >= threshold); } }