Skip to content

Instantly share code, notes, and snippets.

@andy51002000
Created February 15, 2021 03:35
Show Gist options
  • Select an option

  • Save andy51002000/aee3bba1a5e1a21da4c1bfca3998e6fb to your computer and use it in GitHub Desktop.

Select an option

Save andy51002000/aee3bba1a5e1a21da4c1bfca3998e6fb to your computer and use it in GitHub Desktop.

Revisions

  1. andy51002000 created this gist Feb 15, 2021.
    22 changes: 22 additions & 0 deletions dataflow example.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,22 @@
    class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
    parser.add_argument(
    '--input',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='Path of the file to read from')
    parser.add_argument(
    '--output',
    required=True,
    help='Output file to write results to.')

    pipeline_options = PipelineOptions(['--output', './result.txt'])
    p = beam.Pipeline(options=pipeline_options,runner=InteractiveRunner())

    wordcount_options = pipeline_options.view_as(WordcountOptions)

    count = (p
    | 'ReadCollection' >> beam.io.ReadFromText(wordcount_options.input)
    | 'findWord' >> beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE))
    | "lower" >> beam.Map(lambda word: word.lower())
    | "lower_count" >> beam.combiners.Count.PerElement())