Created
February 10, 2020 17:19
-
-
Save sgireddy/06780918382c535b8c60c333db174f5c to your computer and use it in GitHub Desktop.
scio ArrayOutOfBoundsException
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.example.dataflow; | |
| import javax.crypto.Mac; | |
| import javax.crypto.spec.SecretKeySpec; | |
| import java.io.Serializable; | |
| import java.security.InvalidKeyException; | |
| import java.security.NoSuchAlgorithmException; | |
| public class Crypto implements Serializable { | |
| private static final String TAG = "Crypto"; | |
| transient private Mac hmacSHA256; | |
| public Crypto() { | |
| init(); | |
| } | |
| public void init() { | |
| try { | |
| hmacSHA256 = Mac.getInstance("HmacSHA256"); | |
| } catch (SecurityException e) { | |
| e.printStackTrace(); | |
| } catch (NoSuchAlgorithmException e) { | |
| e.printStackTrace(); | |
| } | |
| } | |
| public byte[] calculateHmacSHA256(byte[] input, byte[] key) { | |
| byte[] output = null; | |
| try { | |
| if(hmacSHA256 == null) { | |
| init(); | |
| } | |
| hmacSHA256.init(new SecretKeySpec(key, "HmacSHA256")); | |
| output = hmacSHA256.doFinal(input); | |
| } catch (InvalidKeyException e) { | |
| e.printStackTrace(); | |
| } | |
| return output; | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.example.dataflow; | |
| import org.apache.beam.sdk.Pipeline; | |
| import org.apache.beam.sdk.io.TextIO; | |
| import org.apache.beam.sdk.options.PipelineOptions; | |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
| import org.apache.beam.sdk.transforms.Count; | |
| import org.apache.beam.sdk.transforms.Filter; | |
| import org.apache.beam.sdk.transforms.FlatMapElements; | |
| import org.apache.beam.sdk.transforms.MapElements; | |
| import org.apache.beam.sdk.values.KV; | |
| import org.apache.beam.sdk.values.TypeDescriptors; | |
| import java.util.Arrays; | |
| import java.util.Base64; | |
| public class WordCount { | |
| public static void main(String[] args) { | |
| Crypto crypto = new Crypto(); | |
| PipelineOptions options = PipelineOptionsFactory.create(); | |
| Pipeline p = Pipeline.create(options); | |
| p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt")) | |
| .apply( | |
| FlatMapElements.into(TypeDescriptors.strings()) | |
| .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+")))) | |
| .apply(Filter.by((String word) -> !word.isEmpty())) | |
| .apply(MapElements.into(TypeDescriptors.strings()) | |
| .via((String word) -> new String(Base64.getEncoder().encode(crypto.calculateHmacSHA256(word.getBytes(), "HelloWorld".getBytes()))))) | |
| .apply(Count.perElement()) | |
| .apply( | |
| MapElements.into(TypeDescriptors.strings()) | |
| .via( | |
| (KV<String, Long> wordCount) -> | |
| wordCount.getKey() + ": " + wordCount.getValue())) | |
| .apply(TextIO.write().to("wordcounts")); | |
| p.run().waitUntilFinish(); | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package org.example.dataflow | |
| import java.io.Serializable | |
| import java.util.Base64 | |
| import com.spotify.scio._ | |
| object ScioMinimalWordCount extends Serializable { | |
| @transient lazy val crypto = new Crypto() | |
| def main(cmdlineArgs: Array[String]): Unit = { | |
| // `--input=gs://[BUCKET]/[PATH]/input.txt`, are accessed via `Args`. | |
| val (sc, args) = ContextAndArgs(cmdlineArgs) | |
| sc.textFile(args.getOrElse("input", "gs://apache-beam-samples/shakespeare/kinglear.txt")) | |
| .transform("counter") { | |
| _.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) | |
| .map(x => { | |
| new String(Base64.getEncoder.encode(crypto.calculateHmacSHA256(x.getBytes, "HelloWorld".getBytes))) | |
| }) | |
| .countByValue | |
| } | |
| .map(t => t._1 + ": " + t._2) | |
| .saveAsTextFile("wordcounts") | |
| sc.run() | |
| () | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment