Skip to content

Instantly share code, notes, and snippets.

@sanealytics
Last active June 20, 2020 19:11
Show Gist options
  • Save sanealytics/39884f087e2046e88c8ffce19a2ef1ce to your computer and use it in GitHub Desktop.
Save sanealytics/39884f087e2046e88c8ffce19a2ef1ce to your computer and use it in GitHub Desktop.

Revisions

  1. sanealytics revised this gist Jun 20, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion sample_function_sql.py
    Original file line number Diff line number Diff line change
    @@ -35,7 +35,7 @@ def get_elt_queries():

    # Deploy a wrapper around this function to wake up on file drop of some file
    def sample_transform_table():
    input_data = get_input_df('input_data') # Returns pandas dataframe
    input_data = get_input_df('input_data', 'some_service') # Returns pandas dataframe
    r = service_trace_id('sample_transform_table', ...., input_data['tid'].unique().tolist())
    tid = json.loads(r).get('tid')
    # When writing data back,
  2. sanealytics revised this gist Jun 20, 2020. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion sample_function_sql.py
    Original file line number Diff line number Diff line change
    @@ -13,11 +13,12 @@ def get_elt_queries():
    order by
    added_ts desc
    """,
    # Parameters, tid, prev_tid
    # Parameters: tid
    'output_merge': """
    merge into target t
    using (
    select *,
    @tid as tid,
    timestamp_utc as last_updated_ts,
    tid as last_tid
    from input_table
  3. sanealytics revised this gist Jun 20, 2020. 1 changed file with 22 additions and 4 deletions.
    26 changes: 22 additions & 4 deletions sample_function_sql.py
    Original file line number Diff line number Diff line change
    @@ -8,10 +8,27 @@ def get_elt_queries():
    added_ts
    from
    XXX.TABLE
    where
    service = @service
    where
    service = @service
    order by
    added_ts desc
    added_ts desc
    """,
    # Parameters, tid, prev_tid
    'output_merge': """
    merge into target t
    using (
    select *,
    timestamp_utc as last_updated_ts,
    tid as last_tid
    from input_table
    ) s
    on t.some_id = s.some_id
    when matched then
    update set
    t.last_updated_ts = s.last_updated_ts,
    t.last_tid = s.last_tid
    when not matched then
    insert row;
    """
    }

    @@ -23,4 +40,5 @@ def sample_transform_table():
    # When writing data back,
    # For new rows, insert this new tid into into tid column.
    # For updating rows, insert this new tid into prev_tid column.


    # Check out merge example to do all in one step (when possible)
  4. sanealytics revised this gist Jun 20, 2020. 1 changed file with 4 additions and 4 deletions.
    8 changes: 4 additions & 4 deletions sample_function_sql.py
    Original file line number Diff line number Diff line change
    @@ -17,10 +17,10 @@ def get_elt_queries():

    # Deploy a wrapper around this function to wake up on file drop of some file
    def sample_transform_table():
    input_data = get_input_df('input_data') # Returns pandas dataframe
    input_data = get_input_df('input_data') # Returns pandas dataframe
    r = service_trace_id('sample_transform_table', ...., input_data['tid'].unique().tolist())
    tid = json.loads(r).get('tid')
    # When writing data back,
    # For new rows, insert this new tid into into tid column.
    # For updating rows, insert this new tid into prev_tid column.
    # When writing data back,
    # For new rows, insert this new tid into into tid column.
    # For updating rows, insert this new tid into prev_tid column.

  5. sanealytics created this gist Jun 20, 2020.
    26 changes: 26 additions & 0 deletions sample_function_sql.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,26 @@
    def get_elt_queries():
    """Gets parameterised queries for ELT"""
    return {
    # Parameters: service
    'input_data': """
    select
    data,
    added_ts
    from
    XXX.TABLE
    where
    service = @service
    order by
    added_ts desc
    """
    }

    # Deploy a wrapper around this function to wake up on file drop of some file
    def sample_transform_table():
    input_data = get_input_df('input_data') # Returns pandas dataframe
    r = service_trace_id('sample_transform_table', ...., input_data['tid'].unique().tolist())
    tid = json.loads(r).get('tid')
    # When writing data back,
    # For new rows, insert this new tid into into tid column.
    # For updating rows, insert this new tid into prev_tid column.