Created
February 10, 2020 17:19
-
-
Save sgireddy/06780918382c535b8c60c333db174f5c to your computer and use it in GitHub Desktop.
Revisions
-
sgireddy created this gist
Feb 10, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,42 @@ package org.example.dataflow; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.io.Serializable; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; public class Crypto implements Serializable { private static final String TAG = "Crypto"; transient private Mac hmacSHA256; public Crypto() { init(); } public void init() { try { hmacSHA256 = Mac.getInstance("HmacSHA256"); } catch (SecurityException e) { e.printStackTrace(); } catch (NoSuchAlgorithmException e) { e.printStackTrace(); } } public byte[] calculateHmacSHA256(byte[] input, byte[] key) { byte[] output = null; try { if(hmacSHA256 == null) { init(); } hmacSHA256.init(new SecretKeySpec(key, "HmacSHA256")); output = hmacSHA256.doFinal(input); } catch (InvalidKeyException e) { e.printStackTrace(); } return output; } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,43 @@ package org.example.dataflow; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Filter; import org.apache.beam.sdk.transforms.FlatMapElements; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TypeDescriptors; import java.util.Arrays; import java.util.Base64; public class WordCount { public static void main(String[] args) { Crypto crypto = new Crypto(); PipelineOptions options = PipelineOptionsFactory.create(); Pipeline p = Pipeline.create(options); p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")) .apply( FlatMapElements.into(TypeDescriptors.strings()) .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) .apply(Filter.by((String word) -> !word.isEmpty())) .apply(MapElements.into(TypeDescriptors.strings()) .via((String word) -> new String(Base64.getEncoder().encode(crypto.calculateHmacSHA256(word.getBytes(), "HelloWorld".getBytes()))))) .apply(Count.perElement()) .apply( MapElements.into(TypeDescriptors.strings()) .via( (KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.write().to("wordcounts")); p.run().waitUntilFinish(); } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,31 @@ package org.example.dataflow import java.io.Serializable import java.util.Base64 import com.spotify.scio._ object ScioMinimalWordCount extends Serializable { @transient lazy val crypto = new Crypto() def main(cmdlineArgs: Array[String]): Unit = { // `--input=gs://[BUCKET]/[PATH]/input.txt`, are accessed via `Args`. val (sc, args) = ContextAndArgs(cmdlineArgs) sc.textFile(args.getOrElse("input", "gs://apache-beam-samples/shakespeare/kinglear.txt")) .transform("counter") { _.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) .map(x => { new String(Base64.getEncoder.encode(crypto.calculateHmacSHA256(x.getBytes, "HelloWorld".getBytes))) }) .countByValue } .map(t => t._1 + ": " + t._2) .saveAsTextFile("wordcounts") sc.run() () } }