Skip to content

Instantly share code, notes, and snippets.

@juanmf
Last active July 15, 2017 22:29
Show Gist options
  • Save juanmf/ef6445843c1ccad1ebce43df8567a66a to your computer and use it in GitHub Desktop.
Save juanmf/ef6445843c1ccad1ebce43df8567a66a to your computer and use it in GitHub Desktop.

Revisions

  1. juanmf renamed this gist Jul 15, 2017. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  2. juanmf created this gist Jul 15, 2017.
    45 changes: 45 additions & 0 deletions Spark WordCount in Java8
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,45 @@
    package com.mycompany;

    /**
    * Created by juanmf on 15/07/17.
    */
    import java.util.Arrays;
    import java.util.stream.Collectors;

    import org.apache.spark.api.java.*;
    import org.apache.spark.SparkConf;
    import scala.Tuple2;

    public class WordCount {
    public static void main(String[] args) {

    // create Spark context with Spark configuration
    try (JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"))) {
    int threshold = Integer.parseInt(args[1]);
    String filePath = args[0];
    JavaPairRDD<String, Integer> filtered = filterWordsUnderThreshold(sc, threshold, filePath);

    // count characters
    JavaPairRDD<Character, Integer> charCounts = filtered
    .flatMap(
    s -> s._1.chars().mapToObj(i -> (char)i).collect(Collectors.toList()))
    .mapToPair(c -> new Tuple2<>(c, 1))
    .reduceByKey((i1, i2) -> i1 + i2);

    System.out.println(charCounts.collect());
    }
    }

    private static JavaPairRDD<String, Integer> filterWordsUnderThreshold(JavaSparkContext sc, int threshold, String filePath) {
    // read in text file and split each document into words
    JavaRDD<String> tokenized = sc.textFile(filePath).flatMap(s -> Arrays.asList(s.split(" ")));

    // count the occurrence of each word
    JavaPairRDD<String, Integer> counts = tokenized
    .mapToPair(s -> new Tuple2<>(s, 1))
    .reduceByKey((i1, i2) -> i1 + i2);

    // filter out words with fewer than threshold occurrences
    return counts.filter(tup -> tup._2 >= threshold);
    }
    }