-
-
Save enspdf/5452c851a2521dc143e130bd3f7d9715 to your computer and use it in GitHub Desktop.
Revisions
-
bartosz25 revised this gist
May 8, 2019 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -15,7 +15,7 @@ services: - ./scripts:/tmp ports: - "8081:8080" command: webserver healthcheck: test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"] interval: 30s -
bartosz25 renamed this gist
May 8, 2019 . 1 changed file with 0 additions and 0 deletions.There are no files selected for viewing
File renamed without changes. -
bartosz25 created this gist
May 8, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,3 @@ curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload1\"}"}' curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload2\"}"}' curl -X POST http://localhost:8081/api/experimental/dags/hello_world_a/dag_runs -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{"conf":"{\"task_payload\":\"payload3\"}"}' This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,40 @@ version: '2.1' services: webserver: image: puckel/docker-airflow:1.10.2 restart: always environment: # Loads DAG examples - LOAD_EX=y networks: airflow-external-trigger-network: ipv4_address: 111.18.0.20 volumes: - ./dags:/usr/local/airflow/dags - ./requirements.txt:/requirements.txt - ./scripts:/tmp ports: - "8081:8080" command: webserver & airflow scheduler healthcheck: test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"] interval: 30s timeout: 30s retries: 3 rabbitmq: image: rabbitmq:3.7-management restart: always networks: airflow-external-trigger-network: ipv4_address: 111.18.0.21 hostname: rabbitmqdocker ports: - "15672:15672" networks: airflow-external-trigger-network: driver: bridge ipam: driver: default config: - subnet: 111.18.0.0/16 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,88 @@ import json import pika from airflow import utils from airflow.models import DAG from airflow.operators.dagrun_operator import TriggerDagRunOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import BranchPythonOperator dag = DAG( dag_id='external_trigger', default_args={ "owner": "airflow", 'start_date': utils.dates.days_ago(1), }, schedule_interval='*/1 * * * *', ) def consume_message(**kwargs): connection = pika.BlockingConnection(pika.ConnectionParameters('111.18.0.21')) channel = connection.channel() channel.queue_declare(queue='external_airflow_triggers', durable=True) method_frame, header_frame, body = channel.basic_get(queue='external_airflow_triggers') if body: json_params = json.loads(body) kwargs['ti'].xcom_push(key='job_params', value=json.dumps(json_params['params'])) channel.basic_ack(delivery_tag=method_frame.delivery_tag) connection.close() print("Got message ? {}".format(body)) return json_params['task'] else: return 'task_trash' router = BranchPythonOperator( task_id='router', python_callable=consume_message, dag=dag, provide_context=True, depends_on_past=True ) def trigger_dag_with_context(context, dag_run_obj): ti = context['task_instance'] job_params = ti.xcom_pull(key='job_params', task_ids='router') dag_run_obj.payload = {'task_payload': job_params} return dag_run_obj task_a = trigger = TriggerDagRunOperator( task_id='hello_world_a', trigger_dag_id="hello_world_a", python_callable=trigger_dag_with_context, params={'condition_param': True, 'task_payload': '{}'}, dag=dag, provide_context=True, ) task_b = TriggerDagRunOperator( task_id='hello_world_b', trigger_dag_id="hello_world_b", python_callable=trigger_dag_with_context, params={'condition_param': True, 'task_payload': '{}'}, dag=dag, provide_context=True, ) task_c = TriggerDagRunOperator( task_id='hello_world_c', trigger_dag_id="hello_world_c", python_callable=trigger_dag_with_context, params={'task_payload': '{}'}, dag=dag, provide_context=True, ) task_trash = DummyOperator( task_id='task_trash', dag=dag ) router >> task_a router >> task_b router >> task_c router >> task_trash This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,23 @@ import airflow from airflow import DAG from airflow.operators.python_operator import PythonOperator dag = DAG( dag_id='hello_world_a', default_args={ "owner": "airflow", 'start_date': airflow.utils.dates.days_ago(1), }, schedule_interval=None ) def print_hello(**context): task_params = context['dag_run'].conf['task_payload'] print('Hello world a with {}'.format(task_params)) PythonOperator( task_id='hello_world_printer', python_callable=print_hello, provide_context=True, dag=dag) This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,23 @@ import airflow from airflow import DAG from airflow.operators.python_operator import PythonOperator dag = DAG( dag_id='hello_world_b', default_args={ "owner": "airflow", 'start_date': airflow.utils.dates.days_ago(2), }, schedule_interval=None ) def print_hello(**context): task_params = context['dag_run'].conf['task_payload'] print('Hello world b with {}'.format(task_params)) PythonOperator( task_id='hello_world_printer', python_callable=print_hello, provide_context=True, dag=dag) This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,23 @@ import airflow from airflow import DAG from airflow.operators.python_operator import PythonOperator dag = DAG( dag_id='hello_world_c', default_args={ "owner": "airflow", 'start_date': airflow.utils.dates.days_ago(2), }, schedule_interval=None ) def print_hello(**context): task_params = context['dag_run'].conf['task_payload'] print('Hello world c with {}'.format(task_params)) PythonOperator( task_id='hello_world_printer', python_callable=print_hello, provide_context=True, dag=dag) This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,27 @@ import json from datetime import datetime from random import randint, choice from time import sleep import pika connection = pika.BlockingConnection(pika.ConnectionParameters('111.18.0.21')) channel = connection.channel() channel.queue_declare(queue='external_airflow_triggers', durable=True) tasks = ['hello_world_a', 'hello_world_b', 'hello_world_c'] while True: print('Producing messages at {}'.format(datetime.utcnow())) task_to_trigger = choice(tasks) event_time = str(datetime.utcnow()) message = json.dumps( {'task': task_to_trigger, 'params': {'event_time': event_time, 'value': randint(0, 10000)}} ) channel.basic_publish(exchange='', routing_key='external_airflow_triggers', body=message) print(" [x] Sent {}".format(message)) sleep(2) connection.close() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,3 @@ # for messages producer & local dev pika apache-airflow This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,2 @@ # only for Docker image pika