Skip to content

Instantly share code, notes, and snippets.

@ashb
Forked from pingzh/test_ti_creation.py
Created January 5, 2022 13:09
Show Gist options
  • Save ashb/f43ab6300f0f84ed91a599923de3a673 to your computer and use it in GitHub Desktop.
Save ashb/f43ab6300f0f84ed91a599923de3a673 to your computer and use it in GitHub Desktop.

Revisions

  1. ashb revised this gist Jan 5, 2022. 1 changed file with 49 additions and 12 deletions.
    61 changes: 49 additions & 12 deletions test_ti_creation.py
    Original file line number Diff line number Diff line change
    @@ -3,8 +3,9 @@

    from airflow.utils.db import create_session
    from airflow.utils import timezone
    from airflow.models import TaskInstance
    from airflow.models import TaskInstance, DagRun
    from airflow.models.serialized_dag import SerializedDagModel
    from airflow.utils.types import DagRunType

    logger = logging.getLogger(__name__)
    out_hdlr = logging.FileHandler('./log.txt')
    @@ -13,41 +14,77 @@
    logger.addHandler(out_hdlr)
    logger.setLevel(logging.INFO)

    MODE='bulk_insert_mappings'

    def create_tis_in_new_dag_run(dag, execution_date, number_of_tis):

    def create_tis_in_new_dag_run(dag, run_id, number_of_tis):
    tasks = list(dag.task_dict.values())[0:number_of_tis]

    t1 = time.time()
    t1 = time.monotonic()
    success = True
    tis = []
    try:
    with create_session() as session:
    for task in :
    ti = TaskInstance(task, execution_date)
    session.add(ti)
    if MODE == 'unit-of-work':
    for i, task in enumerate(tasks):
    ti = TaskInstance(task, run_id=run_id)
    session.add(ti)
    elif MODE == 'bulk_save_objects':
    session.bulk_save_objects(
    [
    TaskInstance(task, run_id=run_id)
    for task in tasks
    ]
    )
    elif MODE == 'bulk_insert_mappings':
    session.bulk_insert_mappings(
    TaskInstance,
    [
    {
    'dag_id': task.dag_id,
    'task_id': task.task_id,
    'run_id': run_id,
    'pool': task.pool,
    'queue': task.queue,
    'pool_slots': task.pool_slots,
    'priority_weight': task.priority_weight_total,
    'run_as_user': task.run_as_user,
    'max_tries': task.retries,
    'executor_config': task.executor_config,
    'operator': task.task_type
    }
    for task in tasks
    ]
    )
    session.flush()
    except:
    raise
    success = False

    t2 = time.time()
    logger.info('Created %s tis. success?: %s, perf: %s', number_of_tis, success, t2 - t1)
    t2 = time.monotonic()
    logger.info('Created %s tis. success?: %s, perf: %s', len(tasks), success, t2 - t1)

    return t2 - t1, success

    def perf_tis_creation(dag):
    perf = {}
    for number_of_tis in [1000, 3000, 5000, 10000, 15000, 20000, 25000]:
    next_run_date = timezone.utcnow()

    perf, success = create_tis_in_new_dag_run(dag, next_run_date, number_of_tis)
    with create_session() as session:
    dag_run = DagRun(dag_id=dag_id, run_type=DagRunType.MANUAL, run_id=DagRun.generate_run_id(DagRunType.MANUAL, timezone.utcnow()))
    session.add(dag_run)

    duration, success = create_tis_in_new_dag_run(dag, dag_run.run_id, number_of_tis)

    perf[number_of_tis] = (perf, success)
    perf[number_of_tis] = (duration, success)

    time.sleep(5)


    if __name__ == '__main__':
    dag_id = 'a-very-large-dag'
    dag_id = 'fake_dag'
    dm = SerializedDagModel.get(dag_id)
    dag = dm.dag
    logger.info('%s', MODE)

    perf_tis_creation(dag)
  2. @pingzh pingzh revised this gist Dec 21, 2021. No changes.
  3. @pingzh pingzh created this gist Dec 20, 2021.
    53 changes: 53 additions & 0 deletions test_ti_creation.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,53 @@
    import time
    import logging

    from airflow.utils.db import create_session
    from airflow.utils import timezone
    from airflow.models import TaskInstance
    from airflow.models.serialized_dag import SerializedDagModel

    logger = logging.getLogger(__name__)
    out_hdlr = logging.FileHandler('./log.txt')
    out_hdlr.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
    out_hdlr.setLevel(logging.INFO)
    logger.addHandler(out_hdlr)
    logger.setLevel(logging.INFO)


    def create_tis_in_new_dag_run(dag, execution_date, number_of_tis):
    tasks = list(dag.task_dict.values())[0:number_of_tis]

    t1 = time.time()
    success = True
    try:
    with create_session() as session:
    for task in :
    ti = TaskInstance(task, execution_date)
    session.add(ti)
    session.flush()
    except:
    success = False

    t2 = time.time()
    logger.info('Created %s tis. success?: %s, perf: %s', number_of_tis, success, t2 - t1)

    return t2 - t1, success

    def perf_tis_creation(dag):
    perf = {}
    for number_of_tis in [1000, 3000, 5000, 10000, 15000, 20000, 25000]:
    next_run_date = timezone.utcnow()

    perf, success = create_tis_in_new_dag_run(dag, next_run_date, number_of_tis)

    perf[number_of_tis] = (perf, success)

    time.sleep(5)


    if __name__ == '__main__':
    dag_id = 'a-very-large-dag'
    dm = SerializedDagModel.get(dag_id)
    dag = dm.dag

    perf_tis_creation(dag)