Skip to content

Instantly share code, notes, and snippets.

@lidalei
Created March 7, 2018 09:39
Show Gist options
  • Select an option

  • Save lidalei/493c72a795166a6ff7e08416f7f303e2 to your computer and use it in GitHub Desktop.

Select an option

Save lidalei/493c72a795166a6ff7e08416f7f303e2 to your computer and use it in GitHub Desktop.

Revisions

  1. lidalei created this gist Mar 7, 2018.
    76 changes: 76 additions & 0 deletions PipelineTest.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,76 @@
    import com.google.common.collect.Iterables;
    import com.google.common.primitives.Ints;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.io.GenerateSequence;
    import org.apache.beam.sdk.options.PipelineOptions;
    import org.apache.beam.sdk.options.PipelineOptionsFactory;
    import org.apache.beam.sdk.transforms.Create;
    import org.apache.beam.sdk.transforms.DoFn;
    import org.apache.beam.sdk.transforms.ParDo;
    import org.apache.beam.sdk.transforms.View;
    import org.apache.beam.sdk.values.PCollection;
    import org.apache.beam.sdk.values.PCollectionView;
    import org.joda.time.Duration;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    public class PipelineTest {
    private static final Logger logger = LoggerFactory.getLogger(PipelineTest.class);

    public static void main(String[] args) {
    int[] shit = new int[1000];
    for (int i = 0; i < shit.length; i++) {
    shit[i] = i * i;
    }

    PipelineOptions options = PipelineOptionsFactory.create();

    Pipeline pipeline = Pipeline.create(options);

    PCollection<Iterable<Integer>> sideInput1 =
    pipeline.apply("Create1", Create.<Iterable<Integer>>of(Ints.asList(shit)));

    PCollectionView<Iterable<Integer>> view1 =
    sideInput1.apply("CreateSideInput1", View.asSingleton());

    PCollection<Iterable<Integer>> sideInput2 =
    pipeline.apply("Create2", Create.<Iterable<Integer>>of(Ints.asList(shit)));

    PCollectionView<Iterable<Integer>> view2 =
    sideInput2.apply("CreateSideInput2", View.asSingleton());

    PCollection<String> done =
    pipeline
    .apply(
    "FakeData",
    GenerateSequence.from(0).to(50_000).withRate(10, Duration.standardSeconds(1)))
    .apply(
    "Map1",
    ParDo.of(
    new DoFn<Long, String>() {

    @ProcessElement
    public void processElement(ProcessContext ctx) {
    Long element = ctx.element();

    Iterable<Integer> v1 = ctx.sideInput(view1);
    Iterable<Integer> v2 = ctx.sideInput(view2);

    String out =
    "element "
    + element
    + ", v1 size "
    + Iterables.size(v1)
    + ", v2 size "
    + Iterables.size(v2);

    logger.info("MAP1: " + out);

    ctx.output(out);
    }
    })
    .withSideInputs(view1, view2));

    pipeline.run();
    }
    }