Skip to content

Instantly share code, notes, and snippets.

@enspdf
Forked from bartosz25/Airflow external triggers
Created September 8, 2022 19:17
Show Gist options
  • Save enspdf/5452c851a2521dc143e130bd3f7d9715 to your computer and use it in GitHub Desktop.
Save enspdf/5452c851a2521dc143e130bd3f7d9715 to your computer and use it in GitHub Desktop.

Revisions

  1. @bartosz25 bartosz25 revised this gist May 8, 2019. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion docker-compose.yaml
    Original file line number Diff line number Diff line change
    @@ -15,7 +15,7 @@ services:
    - ./scripts:/tmp
    ports:
    - "8081:8080"
    command: webserver & airflow scheduler
    command: webserver
    healthcheck:
    test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
    interval: 30s
  2. @bartosz25 bartosz25 renamed this gist May 8, 2019. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  3. @bartosz25 bartosz25 created this gist May 8, 2019.
    3 changes: 3 additions & 0 deletions curl external triggers
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,3 @@
    curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload1\"}"}'
    curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload2\"}"}'
    curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload3\"}"}'
    40 changes: 40 additions & 0 deletions docker-compose.yaml
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    version: '2.1'
    services:
    webserver:
    image: puckel/docker-airflow:1.10.2
    restart: always
    environment:
    # Loads DAG examples
    - LOAD_EX=y
    networks:
    airflow-external-trigger-network:
    ipv4_address: 111.18.0.20
    volumes:
    - ./dags:/usr/local/airflow/dags
    - ./requirements.txt:/requirements.txt
    - ./scripts:/tmp
    ports:
    - "8081:8080"
    command: webserver & airflow scheduler
    healthcheck:
    test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
    interval: 30s
    timeout: 30s
    retries: 3
    rabbitmq:
    image: rabbitmq:3.7-management
    restart: always
    networks:
    airflow-external-trigger-network:
    ipv4_address: 111.18.0.21
    hostname: rabbitmqdocker
    ports:
    - "15672:15672"

    networks:
    airflow-external-trigger-network:
    driver: bridge
    ipam:
    driver: default
    config:
    - subnet: 111.18.0.0/16
    88 changes: 88 additions & 0 deletions external_trigger.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,88 @@
    import json

    import pika
    from airflow import utils
    from airflow.models import DAG
    from airflow.operators.dagrun_operator import TriggerDagRunOperator
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import BranchPythonOperator

    dag = DAG(
    dag_id='external_trigger',
    default_args={
    "owner": "airflow",
    'start_date': utils.dates.days_ago(1),
    },
    schedule_interval='*/1 * * * *',
    )


    def consume_message(**kwargs):
    connection = pika.BlockingConnection(pika.ConnectionParameters('111.18.0.21'))
    channel = connection.channel()
    channel.queue_declare(queue='external_airflow_triggers', durable=True)

    method_frame, header_frame, body = channel.basic_get(queue='external_airflow_triggers')
    if body:
    json_params = json.loads(body)
    kwargs['ti'].xcom_push(key='job_params', value=json.dumps(json_params['params']))
    channel.basic_ack(delivery_tag=method_frame.delivery_tag)
    connection.close()
    print("Got message ? {}".format(body))
    return json_params['task']
    else:
    return 'task_trash'


    router = BranchPythonOperator(
    task_id='router',
    python_callable=consume_message,
    dag=dag,
    provide_context=True,
    depends_on_past=True
    )


    def trigger_dag_with_context(context, dag_run_obj):
    ti = context['task_instance']
    job_params = ti.xcom_pull(key='job_params', task_ids='router')
    dag_run_obj.payload = {'task_payload': job_params}
    return dag_run_obj


    task_a = trigger = TriggerDagRunOperator(
    task_id='hello_world_a',
    trigger_dag_id="hello_world_a",
    python_callable=trigger_dag_with_context,
    params={'condition_param': True, 'task_payload': '{}'},
    dag=dag,
    provide_context=True,
    )

    task_b = TriggerDagRunOperator(
    task_id='hello_world_b',
    trigger_dag_id="hello_world_b",
    python_callable=trigger_dag_with_context,
    params={'condition_param': True, 'task_payload': '{}'},
    dag=dag,
    provide_context=True,
    )

    task_c = TriggerDagRunOperator(
    task_id='hello_world_c',
    trigger_dag_id="hello_world_c",
    python_callable=trigger_dag_with_context,
    params={'task_payload': '{}'},
    dag=dag,
    provide_context=True,
    )

    task_trash = DummyOperator(
    task_id='task_trash',
    dag=dag
    )

    router >> task_a
    router >> task_b
    router >> task_c
    router >> task_trash
    23 changes: 23 additions & 0 deletions hello_world_a.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,23 @@
    import airflow
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator

    dag = DAG(
    dag_id='hello_world_a',
    default_args={
    "owner": "airflow",
    'start_date': airflow.utils.dates.days_ago(1),
    },
    schedule_interval=None
    )


    def print_hello(**context):
    task_params = context['dag_run'].conf['task_payload']
    print('Hello world a with {}'.format(task_params))

    PythonOperator(
    task_id='hello_world_printer',
    python_callable=print_hello,
    provide_context=True,
    dag=dag)
    23 changes: 23 additions & 0 deletions hello_world_b.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,23 @@
    import airflow
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator

    dag = DAG(
    dag_id='hello_world_b',
    default_args={
    "owner": "airflow",
    'start_date': airflow.utils.dates.days_ago(2),
    },
    schedule_interval=None
    )


    def print_hello(**context):
    task_params = context['dag_run'].conf['task_payload']
    print('Hello world b with {}'.format(task_params))

    PythonOperator(
    task_id='hello_world_printer',
    python_callable=print_hello,
    provide_context=True,
    dag=dag)
    23 changes: 23 additions & 0 deletions hello_world_c.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,23 @@
    import airflow
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator

    dag = DAG(
    dag_id='hello_world_c',
    default_args={
    "owner": "airflow",
    'start_date': airflow.utils.dates.days_ago(2),
    },
    schedule_interval=None
    )


    def print_hello(**context):
    task_params = context['dag_run'].conf['task_payload']
    print('Hello world c with {}'.format(task_params))

    PythonOperator(
    task_id='hello_world_printer',
    python_callable=print_hello,
    provide_context=True,
    dag=dag)
    27 changes: 27 additions & 0 deletions messages_producer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,27 @@
    import json
    from datetime import datetime
    from random import randint, choice
    from time import sleep

    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters('111.18.0.21'))
    channel = connection.channel()
    channel.queue_declare(queue='external_airflow_triggers', durable=True)

    tasks = ['hello_world_a', 'hello_world_b', 'hello_world_c']

    while True:
    print('Producing messages at {}'.format(datetime.utcnow()))
    task_to_trigger = choice(tasks)
    event_time = str(datetime.utcnow())

    message = json.dumps(
    {'task': task_to_trigger, 'params': {'event_time': event_time, 'value': randint(0, 10000)}}
    )
    channel.basic_publish(exchange='', routing_key='external_airflow_triggers',
    body=message)
    print(" [x] Sent {}".format(message))
    sleep(2)

    connection.close()
    3 changes: 3 additions & 0 deletions requirements.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,3 @@
    # for messages producer & local dev
    pika
    apache-airflow
    2 changes: 2 additions & 0 deletions requirements_docker.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,2 @@
    # only for Docker image
    pika