def test_par_do_with_multiple_outputs_and_using_yield(self): class SomeDoFn(beam.DoFn): """A custom DoFn using yield.""" def process(self, element): yield element if element % 2 == 0: yield pvalue.SideOutputValue('even', element) else: yield pvalue.SideOutputValue('odd', element) pipeline = TestPipeline() nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) results = nums | 'ClassifyNumbers' >> beam.ParDo( SomeDoFn()).with_outputs('odd', 'even', main='main') assert_that(results.main, equal_to([1, 2, 3, 4])) assert_that(results.odd, equal_to([1, 3]), label='assert:odd') assert_that(results.even, equal_to([2, 4]), label='assert:even') pipeline.run()