Skip to content

Instantly share code, notes, and snippets.

@theone4ever
Last active June 27, 2020 12:10
Show Gist options
  • Select an option

  • Save theone4ever/e850d673c3204894356475be2f23f6a4 to your computer and use it in GitHub Desktop.

Select an option

Save theone4ever/e850d673c3204894356475be2f23f6a4 to your computer and use it in GitHub Desktop.

Revisions

  1. theone4ever revised this gist Jun 27, 2020. 2 changed files with 5 additions and 2 deletions.
    5 changes: 4 additions & 1 deletion tasks.py
    Original file line number Diff line number Diff line change
    @@ -2,5 +2,8 @@

    def prep_data(input_param):
    df = pd.read_parquet(input_param['input_data']
    return df
    return df

    def train_model(input_param):
    # TO BE IMPLEMENTED

    2 changes: 1 addition & 1 deletion training_dag.py
    Original file line number Diff line number Diff line change
    @@ -32,7 +32,7 @@
    #Second model training task
    training_model = PythonOperator(
    task_id="prep_data",
    python_callable=tasks.prep_data,
    python_callable=tasks.train_model,
    op_kwargs=input_param,
    provide_context=True,
    executor_config={"KubernetesExecutor": {"image": "my_app/py_ml_imagee:latest"}}
  2. theone4ever revised this gist Jun 27, 2020. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions training_dag.py
    Original file line number Diff line number Diff line change
    @@ -26,7 +26,7 @@
    python_callable=prep_data.prep_data,
    op_kwargs=training_config,
    provide_context=True,
    executor_config=k8s_executor_config
    executor_config={"KubernetesExecutor": {"image": "my_app/py_ml_image:latest"}}
    )

    #Second model training task
    @@ -35,7 +35,7 @@
    python_callable=tasks.prep_data,
    op_kwargs=input_param,
    provide_context=True,
    executor_config=k8s_executor_config
    executor_config={"KubernetesExecutor": {"image": "my_app/py_ml_imagee:latest"}}
    )

    [prep_data, training_model]
  3. theone4ever revised this gist Jun 27, 2020. No changes.
  4. theone4ever revised this gist Jun 27, 2020. 1 changed file with 4 additions and 2 deletions.
    6 changes: 4 additions & 2 deletions training_dag.py
    Original file line number Diff line number Diff line change
    @@ -25,15 +25,17 @@
    task_id="prep_data",
    python_callable=prep_data.prep_data,
    op_kwargs=training_config,
    provide_context=True
    provide_context=True,
    executor_config=k8s_executor_config
    )

    #Second model training task
    training_model = PythonOperator(
    task_id="prep_data",
    python_callable=tasks.prep_data,
    op_kwargs=input_param,
    provide_context=True
    provide_context=True,
    executor_config=k8s_executor_config
    )

    [prep_data, training_model]
  5. theone4ever created this gist Jun 27, 2020.
    6 changes: 6 additions & 0 deletions tasks.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,6 @@
    import pandas as pd

    def prep_data(input_param):
    df = pd.read_parquet(input_param['input_data']
    return df

    39 changes: 39 additions & 0 deletions training_dag.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,39 @@

    default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    }

    training_dag = DAG(
    'my_training_dag',
    default_args=default_args,
    description='A simple tutorial DAG',
    schedule_interval=timedelta(days=1),
    )

    input_param = {'input_data':'/tmp/1.parquet'}

    with training_dag as dag:
    # First data preparation task
    prep_data = PythonOperator(
    task_id="prep_data",
    python_callable=prep_data.prep_data,
    op_kwargs=training_config,
    provide_context=True
    )

    #Second model training task
    training_model = PythonOperator(
    task_id="prep_data",
    python_callable=tasks.prep_data,
    op_kwargs=input_param,
    provide_context=True
    )

    [prep_data, training_model]