Skip to content

Instantly share code, notes, and snippets.

@abridgett
Last active April 29, 2021 22:03
Show Gist options
  • Select an option

  • Save abridgett/a0cc167d4d6ddcd1fef9 to your computer and use it in GitHub Desktop.

Select an option

Save abridgett/a0cc167d4d6ddcd1fef9 to your computer and use it in GitHub Desktop.

Revisions

  1. abridgett revised this gist Dec 21, 2015. 1 changed file with 4 additions and 2 deletions.
    6 changes: 4 additions & 2 deletions airflow_eg.py
    Original file line number Diff line number Diff line change
    @@ -43,7 +43,7 @@ def update_hdfs(ds, **kwargs):
    attachments=MAP_SLACK_ATTACHMENTS,
    trigger_rule='all_done',
    dag=dag)
    dag.set_dependency(speedmap, 'speedmap_slack')
    dag.set_dependency('speedmap', 'speedmap_slack')

    map_email = EmailOperator(
    task_id='speedmap_email',
    @@ -52,4 +52,6 @@ def update_hdfs(ds, **kwargs):
    subject=map + " {{ ds }} {{ task_instance.xcom_pull(task_ids=params.map, key='slack_status') }}",
    html_content=MAP_EMAIL_CONTENT,
    trigger_rule='all_done',
    dag=dag)
    dag=dag)
    # Just to show an alternative approach
    map_email.set_upstream(speedmap)
  2. abridgett created this gist Dec 21, 2015.
    55 changes: 55 additions & 0 deletions airflow_eg.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,55 @@
    MAP_SLACK_ATTACHMENTS = [
    {
    "fallback": "{{params.map}} {{ task_instance.xcom_pull(task_ids=params.map, key='slack_status') }}",
    "pretext": "{{params.map}} update {{ task_instance.xcom_pull(task_ids=params.map, key='slack_status') }}",
    "fields": [
    {
    "title": "Copied",
    "value": "{{ task_instance.xcom_pull(task_ids=params.map, key='copied') }}",
    "short": True
    }
    ]
    }
    ]

    MAP_EMAIL_CONTENT = """
    <b>Map:</b> {{ params.map }}<br>
    <b>Date:</b> {{ ds }}<br>
    <p>
    <b>Copied:</b> {{ task_instance.xcom_pull(task_ids=params.map, key='copied') }}<br>
    """

    def update_hdfs(ds, **kwargs):
    ....
    kwargs['ti'].xcom_push(key='slack_status', value='danger')
    if success:
    kwargs['ti'].xcom_push(key='slack_status', value='good')
    kwargs['ti'].xcom_push(key='copied', value=int(m.group(1)))

    speedmap = PythonOperator(
    task_id='speedmap',
    python_callable=update_hdfs,
    params={'map': 'speedmap'},
    provide_context=True,
    pool = 'speedmap', # no simultaneous runs
    dag=dag)

    map_slack = SlackAPIPostOperator(
    task_id='speedmap_slack',
    channel="#airflow-test",
    token=Variable.get('slack_token'),
    params={'map': speedmap},
    text='',
    attachments=MAP_SLACK_ATTACHMENTS,
    trigger_rule='all_done',
    dag=dag)
    dag.set_dependency(speedmap, 'speedmap_slack')

    map_email = EmailOperator(
    task_id='speedmap_email',
    to="[email protected]",
    params={'map': map},
    subject=map + " {{ ds }} {{ task_instance.xcom_pull(task_ids=params.map, key='slack_status') }}",
    html_content=MAP_EMAIL_CONTENT,
    trigger_rule='all_done',
    dag=dag)