Skip to content

Instantly share code, notes, and snippets.

@sgireddy
Created February 10, 2020 17:19
Show Gist options
  • Save sgireddy/06780918382c535b8c60c333db174f5c to your computer and use it in GitHub Desktop.
Save sgireddy/06780918382c535b8c60c333db174f5c to your computer and use it in GitHub Desktop.

Revisions

  1. sgireddy created this gist Feb 10, 2020.
    42 changes: 42 additions & 0 deletions main_java_org_example_dataflow_Crypto.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,42 @@
    package org.example.dataflow;

    import javax.crypto.Mac;
    import javax.crypto.spec.SecretKeySpec;
    import java.io.Serializable;
    import java.security.InvalidKeyException;
    import java.security.NoSuchAlgorithmException;

    public class Crypto implements Serializable {

    private static final String TAG = "Crypto";

    transient private Mac hmacSHA256;

    public Crypto() {
    init();
    }

    public void init() {
    try {
    hmacSHA256 = Mac.getInstance("HmacSHA256");
    } catch (SecurityException e) {
    e.printStackTrace();
    } catch (NoSuchAlgorithmException e) {
    e.printStackTrace();
    }
    }

    public byte[] calculateHmacSHA256(byte[] input, byte[] key) {
    byte[] output = null;
    try {
    if(hmacSHA256 == null) {
    init();
    }
    hmacSHA256.init(new SecretKeySpec(key, "HmacSHA256"));
    output = hmacSHA256.doFinal(input);
    } catch (InvalidKeyException e) {
    e.printStackTrace();
    }
    return output;
    }
    }
    43 changes: 43 additions & 0 deletions main_java_org_example_dataflow_WordCount.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,43 @@
    package org.example.dataflow;

    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.TextIO;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.Count;
    import org.apache.beam.sdk.transforms.Filter;
    import org.apache.beam.sdk.transforms.FlatMapElements;
    import org.apache.beam.sdk.transforms.MapElements;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.TypeDescriptors;

    import java.util.Arrays;
    import java.util.Base64;

    public class WordCount {

    public static void main(String[] args) {

    Crypto crypto = new Crypto();
    PipelineOptions options = PipelineOptionsFactory.create();
    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))

    .apply(
    FlatMapElements.into(TypeDescriptors.strings())
    .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
    .apply(Filter.by((String word) -> !word.isEmpty()))
    .apply(MapElements.into(TypeDescriptors.strings())
    .via((String word) -> new String(Base64.getEncoder().encode(crypto.calculateHmacSHA256(word.getBytes(), "HelloWorld".getBytes())))))
    .apply(Count.perElement())
    .apply(
    MapElements.into(TypeDescriptors.strings())
    .via(
    (KV<String, Long> wordCount) ->
    wordCount.getKey() + ": " + wordCount.getValue()))
    .apply(TextIO.write().to("wordcounts"));

    p.run().waitUntilFinish();
    }
    }
    31 changes: 31 additions & 0 deletions main_scala_org_example_dataflow_ScioMinimalWordCount.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,31 @@
    package org.example.dataflow

    import java.io.Serializable
    import java.util.Base64


    import com.spotify.scio._

    object ScioMinimalWordCount extends Serializable {

    @transient lazy val crypto = new Crypto()

    def main(cmdlineArgs: Array[String]): Unit = {
    // `--input=gs://[BUCKET]/[PATH]/input.txt`, are accessed via `Args`.
    val (sc, args) = ContextAndArgs(cmdlineArgs)

    sc.textFile(args.getOrElse("input", "gs://apache-beam-samples/shakespeare/kinglear.txt"))
    .transform("counter") {
    _.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
    .map(x => {
    new String(Base64.getEncoder.encode(crypto.calculateHmacSHA256(x.getBytes, "HelloWorld".getBytes)))
    })
    .countByValue
    }
    .map(t => t._1 + ": " + t._2)
    .saveAsTextFile("wordcounts")

    sc.run()
    ()
    }
    }