package com.mycompany; /** * Created by juanmf on 15/07/17. */ import java.util.Arrays; import java.util.stream.Collectors; import org.apache.spark.api.java.*; import org.apache.spark.SparkConf; import scala.Tuple2; public class WordCount { public static void main(String[] args) { // create Spark context with Spark configuration try (JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"))) { int threshold = Integer.parseInt(args[1]); String filePath = args[0]; JavaPairRDD filtered = filterWordsUnderThreshold(sc, threshold, filePath); // count characters JavaPairRDD charCounts = filtered .flatMap( s -> s._1.chars().mapToObj(i -> (char)i).collect(Collectors.toList())) .mapToPair(c -> new Tuple2<>(c, 1)) .reduceByKey((i1, i2) -> i1 + i2); System.out.println(charCounts.collect()); } } private static JavaPairRDD filterWordsUnderThreshold(JavaSparkContext sc, int threshold, String filePath) { // read in text file and split each document into words JavaRDD tokenized = sc.textFile(filePath).flatMap(s -> Arrays.asList(s.split(" "))); // count the occurrence of each word JavaPairRDD counts = tokenized .mapToPair(s -> new Tuple2<>(s, 1)) .reduceByKey((i1, i2) -> i1 + i2); // filter out words with fewer than threshold occurrences return counts.filter(tup -> tup._2 >= threshold); } }