Skip to content

Instantly share code, notes, and snippets.

@sgireddy
Created February 10, 2020 17:19
Show Gist options
  • Save sgireddy/06780918382c535b8c60c333db174f5c to your computer and use it in GitHub Desktop.
Save sgireddy/06780918382c535b8c60c333db174f5c to your computer and use it in GitHub Desktop.
scio ArrayOutOfBoundsException
package org.example.dataflow;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.Serializable;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
public class Crypto implements Serializable {
private static final String TAG = "Crypto";
transient private Mac hmacSHA256;
public Crypto() {
init();
}
public void init() {
try {
hmacSHA256 = Mac.getInstance("HmacSHA256");
} catch (SecurityException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}
public byte[] calculateHmacSHA256(byte[] input, byte[] key) {
byte[] output = null;
try {
if(hmacSHA256 == null) {
init();
}
hmacSHA256.init(new SecretKeySpec(key, "HmacSHA256"));
output = hmacSHA256.doFinal(input);
} catch (InvalidKeyException e) {
e.printStackTrace();
}
return output;
}
}
package org.example.dataflow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
import java.util.Base64;
public class WordCount {
public static void main(String[] args) {
Crypto crypto = new Crypto();
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/kinglear.txt"))
.apply(
FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(MapElements.into(TypeDescriptors.strings())
.via((String word) -> new String(Base64.getEncoder().encode(crypto.calculateHmacSHA256(word.getBytes(), "HelloWorld".getBytes())))))
.apply(Count.perElement())
.apply(
MapElements.into(TypeDescriptors.strings())
.via(
(KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
.apply(TextIO.write().to("wordcounts"));
p.run().waitUntilFinish();
}
}
package org.example.dataflow
import java.io.Serializable
import java.util.Base64
import com.spotify.scio._
object ScioMinimalWordCount extends Serializable {
@transient lazy val crypto = new Crypto()
def main(cmdlineArgs: Array[String]): Unit = {
// `--input=gs://[BUCKET]/[PATH]/input.txt`, are accessed via `Args`.
val (sc, args) = ContextAndArgs(cmdlineArgs)
sc.textFile(args.getOrElse("input", "gs://apache-beam-samples/shakespeare/kinglear.txt"))
.transform("counter") {
_.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
.map(x => {
new String(Base64.getEncoder.encode(crypto.calculateHmacSHA256(x.getBytes, "HelloWorld".getBytes)))
})
.countByValue
}
.map(t => t._1 + ": " + t._2)
.saveAsTextFile("wordcounts")
sc.run()
()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment