from airflow import DAG from airflow.operators import PythonOperator, TriggerDagRunOperator from datetime import datetime, timedelta import sys sys.path.append('/home/pablo/workspace/scratch/') from default_test import default_test default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.now() - timedelta(minutes=15), 'email': ['airflow@airflow.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 0, # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), } def trigger(context, dag_run_obj): if context['params']['condition_param']: return dag_run_obj dag = DAG('dag_with_trigger', default_args=default_args, schedule_interval=timedelta(minutes=15)) first = PythonOperator( task_id='first', python_callable=default_test, dag=dag) trigger = TriggerDagRunOperator(task_id='trigger', trigger_dag_id="target_dag", python_callable=trigger, params={'condition_param': True, 'message': 'Running'}, trigger_rule='all_success', dag=dag) trigger.set_upstream(first)