Skip to content

Instantly share code, notes, and snippets.

@prabeesh
Created July 31, 2017 07:18
Show Gist options
  • Save prabeesh/bfa15161f413a92837940cd9cf9cb834 to your computer and use it in GitHub Desktop.
Save prabeesh/bfa15161f413a92837940cd9cf9cb834 to your computer and use it in GitHub Desktop.
pcoll1 = ..........
pcoll2 = ..........
left_joined = (
{'left': pcoll1, 'right': pcoll2}
| 'LeftJoiner: Combine' >> beam.CoGroupByKey()
| 'LeftJoiner: ExtractValues' >> beam.Values()
| 'LeftJoiner: JoinValues' >> beam.ParDo(LeftJoinerFn())
)
class LeftJoinerFn(beam.DoFn):
def __init__(self):
super(LeftJoinerFn, self).__init__()
def process(self, row, **kwargs):
left = row['left']
right = row['right']
if left and right:
for each in left:
yield each + right[0]
elif left:
for each in left:
yield each
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment