Skip to content

Instantly share code, notes, and snippets.

@prabeesh
Last active May 30, 2017 11:48
Show Gist options
  • Save prabeesh/157c8d4f11d81660249c37111be481bb to your computer and use it in GitHub Desktop.
Save prabeesh/157c8d4f11d81660249c37111be481bb to your computer and use it in GitHub Desktop.
Beam examples for quick reference
output = (lines
| 'split' >> beam.Map(
lambda x: (x[:10], x[10:99]))
.with_output_types(beam.typehints.KV[str, str])
| 'group' >> beam.GroupByKey()
| 'format' >> beam.FlatMap(
lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
def test_as_dict_with_unique_labels(self):
some_kvs = [('a', 1), ('b', 2)]
pipeline = self.create_pipeline()
main_input = pipeline | 'main input' >> beam.Create([1])
side_kvs = pipeline | 'side kvs' >> beam.Create(some_kvs)
results = main_input | beam.FlatMap(
lambda x, dct1, dct2: [[x, dct1, dct2]],
beam.pvalue.AsDict(side_kvs),
beam.pvalue.AsDict(side_kvs, label='label'))
def test_par_do_with_multiple_outputs_and_using_yield(self):
class SomeDoFn(beam.DoFn):
"""A custom DoFn using yield."""
def process(self, element):
yield element
if element % 2 == 0:
yield pvalue.SideOutputValue('even', element)
else:
yield pvalue.SideOutputValue('odd', element)
pipeline = TestPipeline()
nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4])
results = nums | 'ClassifyNumbers' >> beam.ParDo(
SomeDoFn()).with_outputs('odd', 'even', main='main')
assert_that(results.main, equal_to([1, 2, 3, 4]))
assert_that(results.odd, equal_to([1, 3]), label='assert:odd')
assert_that(results.even, equal_to([2, 4]), label='assert:even')
pipeline.run()
class _MeanCombineFn(beam.CombineFn):
def create_accumulator(self):
return (0, 0)
def add_input(self, (sum_, count), element):
return sum_ + element, count + 1
def merge_accumulators(self, accumulators):
sums, counts = zip(*accumulators)
return sum(sums), sum(counts)
def extract_output(self, (sum_, count)):
if not count:
return float('nan')
return sum_ / float(count)
def test_co_group_by_key_on_list(self):
pipeline = TestPipeline()
pcoll_1 = pipeline | 'Start 1' >> beam.Create(
[('a', 1), ('a', 2), ('b', 3), ('c', 4)])
pcoll_2 = pipeline | 'Start 2' >> beam.Create(
[('a', 5), ('a', 6), ('c', 7), ('c', 8)])
result = (pcoll_1, pcoll_2) | beam.CoGroupByKey()
assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
('b', ([3], [])),
('c', ([4], [7, 8]))]))
pipeline.run()
def test_keys_and_values(self):
pipeline = TestPipeline()
pcoll = pipeline | 'Start' >> beam.Create(
[(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)])
keys = pcoll.apply(beam.Keys('keys'))
vals = pcoll.apply(beam.Values('vals'))
assert_that(keys, equal_to([1, 2, 2, 3, 3, 3]), label='assert:keys')
assert_that(vals, equal_to([1, 1, 1, 2, 2, 3]), label='assert:vals')
def test_group_by_key_only_output_type_deduction(self):
d = (self.p
| 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str)
| ('Pair' >> beam.Map(lambda x: (x, ord(x)))
.with_output_types(typehints.KV[str, str]))
| beam.GroupByKeyOnly())
# Output type should correctly be deduced.
# GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]].
self.assertCompatible(typehints.KV[str, typehints.Iterable[str]],
d.element_type)
import logging
from apache_beam.internal.gcp import auth
from apitools.base.py.exceptions import HttpError
from apache_beam.io.gcp.internal.clients import bigquery
def delete(project_id, dataset_id, table_id):
client = bigquery.BigqueryV2(
credentials=auth.get_service_credentials())
request = bigquery.BigqueryTablesDeleteRequest(projectId=project_id, datasetId=dataset_id, tableId=table_id)
try:
client.tables.Delete(request)
except HttpError as exn:
if exn.status_code == 404:
logging.warning('Table %s:%s.%s does not exist', project_id, dataset_id, table_id)
return
else:
raise
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment