import com.google.common.collect.Iterables; import com.google.common.primitives.Ints; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PipelineTest { private static final Logger logger = LoggerFactory.getLogger(PipelineTest.class); public static void main(String[] args) { int[] shit = new int[1000]; for (int i = 0; i < shit.length; i++) { shit[i] = i * i; } PipelineOptions options = PipelineOptionsFactory.create(); Pipeline pipeline = Pipeline.create(options); PCollection> sideInput1 = pipeline.apply("Create1", Create.>of(Ints.asList(shit))); PCollectionView> view1 = sideInput1.apply("CreateSideInput1", View.asSingleton()); PCollection> sideInput2 = pipeline.apply("Create2", Create.>of(Ints.asList(shit))); PCollectionView> view2 = sideInput2.apply("CreateSideInput2", View.asSingleton()); PCollection done = pipeline .apply( "FakeData", GenerateSequence.from(0).to(50_000).withRate(10, Duration.standardSeconds(1))) .apply( "Map1", ParDo.of( new DoFn() { @ProcessElement public void processElement(ProcessContext ctx) { Long element = ctx.element(); Iterable v1 = ctx.sideInput(view1); Iterable v2 = ctx.sideInput(view2); String out = "element " + element + ", v1 size " + Iterables.size(v1) + ", v2 size " + Iterables.size(v2); logger.info("MAP1: " + out); ctx.output(out); } }) .withSideInputs(view1, view2)); pipeline.run(); } }