Skip to content

Instantly share code, notes, and snippets.

@fillipo
Created July 1, 2019 19:54
Show Gist options
  • Select an option

  • Save fillipo/2a0bb8b16ac6c477a2d59b2d86845d1f to your computer and use it in GitHub Desktop.

Select an option

Save fillipo/2a0bb8b16ac6c477a2d59b2d86845d1f to your computer and use it in GitHub Desktop.
Cloud Composer (Apache Airflow) Example
#!/usr/bin/python
# -*- coding: utf-8 -*-
from airflow import DAG
# from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.contrib.operators.gcs_download_operator import GoogleCloudStorageDownloadOperator
from airflow.contrib.operators.file_to_gcs import FileToGoogleCloudStorageOperator
from airflow.contrib.operators.mlengine_operator import MLEngineBatchPredictionOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators import bash_operator
import random
import pandas as pd
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 6, 30),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'schedule_interval': '*/1 * * * *',
}
dag = DAG('composer-demo1-dag', catchup=False, default_args=default_args)
local_file = '/tmp/cdv_input.csv'
bucket_name = 'composer-demo1'
bucket_file = 'input_data/cdv_input.csv'
bq_dataset = 'composer_demo1_dataset'
bq_table = 'composer_demo1_table'
output_bucket_file = 'output_data/cdv_output.csv'
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = GoogleCloudStorageDownloadOperator(
task_id='gcs_download',
dag=dag,
bucket=bucket_name,
object=bucket_file,
filename=local_file,
google_cloud_storage_conn_id='google_cloud_storage_default',
)
def transform_csv():
df = pd.read_csv(local_file)
# new_file = df.rename(index=str, columns={'torre': 'torre_new'})
new_file = df.set_value(1, 'velocidade', 800)
new_file.to_csv(local_file, index=None, header=True)
t2 = PythonOperator(task_id='transform_data',
python_callable=transform_csv, dag=dag)
t3 = FileToGoogleCloudStorageOperator(
task_id='gcs_upload',
src=local_file,
bucket=bucket_name,
dst=output_bucket_file,
google_cloud_storage_conn_id='google_cloud_storage_default',
dag=dag,
)
# Create BigQuery output dataset.
t4 = bash_operator.BashOperator(
task_id='append_data_to_bigquery',
bash_command="""\
bq --location=US load --autodetect --noreplace \
--source_format=CSV {}.{} gs://{}/{}"""
.format(bq_dataset,bq_table, bucket_name, output_bucket_file),
dag=dag)
t1 >> t2 >> t3 >> t4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment