Skip to content

Instantly share code, notes, and snippets.

@theone4ever
Last active June 27, 2020 12:10
Show Gist options
  • Select an option

  • Save theone4ever/e850d673c3204894356475be2f23f6a4 to your computer and use it in GitHub Desktop.

Select an option

Save theone4ever/e850d673c3204894356475be2f23f6a4 to your computer and use it in GitHub Desktop.
import pandas as pd
def prep_data(input_param):
df = pd.read_parquet(input_param['input_data']
return df
def train_model(input_param):
# TO BE IMPLEMENTED
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
training_dag = DAG(
'my_training_dag',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval=timedelta(days=1),
)
input_param = {'input_data':'/tmp/1.parquet'}
with training_dag as dag:
# First data preparation task
prep_data = PythonOperator(
task_id="prep_data",
python_callable=prep_data.prep_data,
op_kwargs=training_config,
provide_context=True,
executor_config={"KubernetesExecutor": {"image": "my_app/py_ml_image:latest"}}
)
#Second model training task
training_model = PythonOperator(
task_id="prep_data",
python_callable=tasks.train_model,
op_kwargs=input_param,
provide_context=True,
executor_config={"KubernetesExecutor": {"image": "my_app/py_ml_imagee:latest"}}
)
[prep_data, training_model]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment