Last active
          May 30, 2017 11:48 
        
      - 
      
- 
        Save prabeesh/157c8d4f11d81660249c37111be481bb to your computer and use it in GitHub Desktop. 
    Beam examples for quick reference
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | output = (lines | |
| | 'split' >> beam.Map( | |
| lambda x: (x[:10], x[10:99])) | |
| .with_output_types(beam.typehints.KV[str, str]) | |
| | 'group' >> beam.GroupByKey() | |
| | 'format' >> beam.FlatMap( | |
| lambda (key, vals): ['%s%s' % (key, val) for val in vals])) | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | def test_as_dict_with_unique_labels(self): | |
| some_kvs = [('a', 1), ('b', 2)] | |
| pipeline = self.create_pipeline() | |
| main_input = pipeline | 'main input' >> beam.Create([1]) | |
| side_kvs = pipeline | 'side kvs' >> beam.Create(some_kvs) | |
| results = main_input | beam.FlatMap( | |
| lambda x, dct1, dct2: [[x, dct1, dct2]], | |
| beam.pvalue.AsDict(side_kvs), | |
| beam.pvalue.AsDict(side_kvs, label='label')) | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | def test_par_do_with_multiple_outputs_and_using_yield(self): | |
| class SomeDoFn(beam.DoFn): | |
| """A custom DoFn using yield.""" | |
| def process(self, element): | |
| yield element | |
| if element % 2 == 0: | |
| yield pvalue.SideOutputValue('even', element) | |
| else: | |
| yield pvalue.SideOutputValue('odd', element) | |
| pipeline = TestPipeline() | |
| nums = pipeline | 'Some Numbers' >> beam.Create([1, 2, 3, 4]) | |
| results = nums | 'ClassifyNumbers' >> beam.ParDo( | |
| SomeDoFn()).with_outputs('odd', 'even', main='main') | |
| assert_that(results.main, equal_to([1, 2, 3, 4])) | |
| assert_that(results.odd, equal_to([1, 3]), label='assert:odd') | |
| assert_that(results.even, equal_to([2, 4]), label='assert:even') | |
| pipeline.run() | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | class _MeanCombineFn(beam.CombineFn): | |
| def create_accumulator(self): | |
| return (0, 0) | |
| def add_input(self, (sum_, count), element): | |
| return sum_ + element, count + 1 | |
| def merge_accumulators(self, accumulators): | |
| sums, counts = zip(*accumulators) | |
| return sum(sums), sum(counts) | |
| def extract_output(self, (sum_, count)): | |
| if not count: | |
| return float('nan') | |
| return sum_ / float(count) | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | def test_co_group_by_key_on_list(self): | |
| pipeline = TestPipeline() | |
| pcoll_1 = pipeline | 'Start 1' >> beam.Create( | |
| [('a', 1), ('a', 2), ('b', 3), ('c', 4)]) | |
| pcoll_2 = pipeline | 'Start 2' >> beam.Create( | |
| [('a', 5), ('a', 6), ('c', 7), ('c', 8)]) | |
| result = (pcoll_1, pcoll_2) | beam.CoGroupByKey() | |
| assert_that(result, equal_to([('a', ([1, 2], [5, 6])), | |
| ('b', ([3], [])), | |
| ('c', ([4], [7, 8]))])) | |
| pipeline.run() | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | def test_keys_and_values(self): | |
| pipeline = TestPipeline() | |
| pcoll = pipeline | 'Start' >> beam.Create( | |
| [(3, 1), (2, 1), (1, 1), (3, 2), (2, 2), (3, 3)]) | |
| keys = pcoll.apply(beam.Keys('keys')) | |
| vals = pcoll.apply(beam.Values('vals')) | |
| assert_that(keys, equal_to([1, 2, 2, 3, 3, 3]), label='assert:keys') | |
| assert_that(vals, equal_to([1, 1, 1, 2, 2, 3]), label='assert:vals') | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | def test_group_by_key_only_output_type_deduction(self): | |
| d = (self.p | |
| | 'Str' >> beam.Create(['t', 'e', 's', 't']).with_output_types(str) | |
| | ('Pair' >> beam.Map(lambda x: (x, ord(x))) | |
| .with_output_types(typehints.KV[str, str])) | |
| | beam.GroupByKeyOnly()) | |
| # Output type should correctly be deduced. | |
| # GBK-only should deduce that KV[A, B] is turned into KV[A, Iterable[B]]. | |
| self.assertCompatible(typehints.KV[str, typehints.Iterable[str]], | |
| d.element_type) | 
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | import logging | |
| from apache_beam.internal.gcp import auth | |
| from apitools.base.py.exceptions import HttpError | |
| from apache_beam.io.gcp.internal.clients import bigquery | |
| def delete(project_id, dataset_id, table_id): | |
| client = bigquery.BigqueryV2( | |
| credentials=auth.get_service_credentials()) | |
| request = bigquery.BigqueryTablesDeleteRequest(projectId=project_id, datasetId=dataset_id, tableId=table_id) | |
| try: | |
| client.tables.Delete(request) | |
| except HttpError as exn: | |
| if exn.status_code == 404: | |
| logging.warning('Table %s:%s.%s does not exist', project_id, dataset_id, table_id) | |
| return | |
| else: | |
| raise | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment