Skip to content

Instantly share code, notes, and snippets.

@sararob
Last active August 21, 2022 11:09
Show Gist options
  • Save sararob/1eea7ae2b08e85851855ec2eff8c2d8b to your computer and use it in GitHub Desktop.
Save sararob/1eea7ae2b08e85851855ec2eff8c2d8b to your computer and use it in GitHub Desktop.

Revisions

  1. sararob revised this gist Sep 17, 2021. No changes.
  2. sararob revised this gist Jul 21, 2021. No changes.
  3. sararob revised this gist Jul 21, 2021. No changes.
  4. sararob revised this gist Jul 20, 2021. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -80,7 +80,7 @@ def check_table_size(request):

    query_job = client.query(
    """
    SELECT num_rows_last_retraining FROM `sara-vertex-demos.beans.count`
    SELECT num_rows_last_retraining FROM `your-project.your-dataset.count`
    ORDER BY last_retrain_time DESC
    LIMIT 1"""
    )
  5. sararob revised this gist Jul 20, 2021. 1 changed file with 0 additions and 3 deletions.
    3 changes: 0 additions & 3 deletions pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -1,10 +1,8 @@
    # Copyright 2021 Google LLC.
    # SPDX-License-Identifier: Apache-2.0

    import kfp
    import json
    import time

    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    from kfp.v2.google.client import AIPlatformClient
    @@ -28,7 +26,6 @@ def create_count_table(table_id, num_rows):
    bigquery.SchemaField("num_rows_last_retraining", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("last_retrain_time", "TIMESTAMP", mode="REQUIRED")
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")
  6. sararob revised this gist Jul 20, 2021. 1 changed file with 3 additions and 0 deletions.
    3 changes: 3 additions & 0 deletions pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,6 @@
    # Copyright 2021 Google LLC.
    # SPDX-License-Identifier: Apache-2.0

    import kfp
    import json
    import time
  7. sararob revised this gist Jul 20, 2021. 1 changed file with 18 additions and 19 deletions.
    37 changes: 18 additions & 19 deletions pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -74,27 +74,26 @@ def check_table_size(request):
    count_table = client.get_table(f"{client.project}.{dataset}.count")
    print("Count table exists, querying to see how many rows at last pipeline run")

    query_job = client.query(
    """
    SELECT num_rows_last_retraining FROM `sara-vertex-demos.beans.count`
    ORDER BY last_retrain_time DESC
    LIMIT 1"""
    )

    results = query_job.result()
    for i in results:
    last_retrain_count = i[0]

    rows_added_since_last_pipeline_run = current_rows - last_retrain_count
    print(f"{rows_added_since_last_pipeline_run} rows have been added since we last ran the pipeline")

    if (rows_added_since_last_pipeline_run >= RETRAIN_THRESHOLD):
    pipeline_result = create_pipeline_run()
    insert_bq_data(f"{client.project}.{dataset}.count", current_rows)

    except NotFound:
    print("No count table found, creating one...")
    create_count_table(f"{client.project}.{dataset}.count", current_rows)
    create_pipeline_run()

    query_job = client.query(
    """
    SELECT num_rows_last_retraining FROM `sara-vertex-demos.beans.count`
    ORDER BY last_retrain_time DESC
    LIMIT 1"""
    )

    results = query_job.result()
    for i in results:
    last_retrain_count = i[0]

    rows_added_since_last_pipeline_run = current_rows - last_retrain_count
    print(f"{rows_added_since_last_pipeline_run} rows have been added since we last ran the pipeline")

    if (rows_added_since_last_pipeline_run >= RETRAIN_THRESHOLD):
    pipeline_result = create_pipeline_run()
    insert_bq_data(f"{client.project}.{dataset}.count", current_rows)
    else:
    return f"No BigQuery data given"
  8. sararob revised this gist Jul 20, 2021. 1 changed file with 8 additions and 8 deletions.
    16 changes: 8 additions & 8 deletions pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -18,7 +18,7 @@ def insert_bq_data(table_id, num_rows):
    if errors == []:
    print("New rows have been added.")
    else:
    print("Encountered errors while inserting rows: {}".format(errors))
    print(f"Encountered errors while inserting rows: {errors}")

    def create_count_table(table_id, num_rows):
    schema = [
    @@ -28,7 +28,7 @@ def create_count_table(table_id, num_rows):

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)
    print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

    insert_bq_data(table_id, num_rows)

    @@ -65,13 +65,13 @@ def check_table_size(request):
    dataset = request_json['bq_dataset']
    table = request_json['bq_table']

    data_table = client.get_table("{}.{}.{}".format(client.project, dataset, table))
    data_table = client.get_table(f"{client.project}.{dataset}.{table}")
    current_rows = data_table.num_rows
    print("{} table has {} rows".format(table, current_rows))
    print(f"{table} table has {current_rows} rows")

    # See if `count` table exists in dataset
    try:
    count_table = client.get_table("{}.{}.count".format(client.project, dataset))
    count_table = client.get_table(f"{client.project}.{dataset}.count")
    print("Count table exists, querying to see how many rows at last pipeline run")

    query_job = client.query(
    @@ -86,15 +86,15 @@ def check_table_size(request):
    last_retrain_count = i[0]

    rows_added_since_last_pipeline_run = current_rows - last_retrain_count
    print("{} rows have been added since we last ran the pipeline".format(rows_added_since_last_pipeline_run))
    print(f"{rows_added_since_last_pipeline_run} rows have been added since we last ran the pipeline")

    if (rows_added_since_last_pipeline_run >= RETRAIN_THRESHOLD):
    pipeline_result = create_pipeline_run()
    insert_bq_data("{}.{}.count".format(client.project, dataset), current_rows)
    insert_bq_data(f"{client.project}.{dataset}.count", current_rows)

    except NotFound:
    print("No count table found, creating one...")
    create_count_table("{}.{}.count".format(client.project, dataset), current_rows)
    create_count_table(f"{client.project}.{dataset}.count", current_rows)
    create_pipeline_run()
    else:
    return f"No BigQuery data given"
  9. sararob revised this gist Jul 20, 2021. 1 changed file with 1 addition and 10 deletions.
    11 changes: 1 addition & 10 deletions pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -4,11 +4,9 @@

    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    from google.oauth2 import service_account
    from kfp.v2.google.client import AIPlatformClient

    client = bigquery.Client()

    RETRAIN_THRESHOLD = 1000 # Change this based on your use case

    def insert_bq_data(table_id, num_rows):
    @@ -62,14 +60,11 @@ def check_table_size(request):
    except ValueError as e:
    print(f"Error decoding JSON: {e}")
    return "JSON Error", 400

    print(request_json)

    if request_json and 'bq_dataset' in request_json:

    dataset = request_json['bq_dataset']
    table = request_json['bq_table']

    # Get size of data table
    data_table = client.get_table("{}.{}.{}".format(client.project, dataset, table))
    current_rows = data_table.num_rows
    print("{} table has {} rows".format(table, current_rows))
    @@ -79,7 +74,6 @@ def check_table_size(request):
    count_table = client.get_table("{}.{}.count".format(client.project, dataset))
    print("Count table exists, querying to see how many rows at last pipeline run")

    # See how many rows have been added since the last retrain
    query_job = client.query(
    """
    SELECT num_rows_last_retraining FROM `sara-vertex-demos.beans.count`
    @@ -96,14 +90,11 @@ def check_table_size(request):

    if (rows_added_since_last_pipeline_run >= RETRAIN_THRESHOLD):
    pipeline_result = create_pipeline_run()

    # Update the BQ table
    insert_bq_data("{}.{}.count".format(client.project, dataset), current_rows)

    except NotFound:
    print("No count table found, creating one...")
    create_count_table("{}.{}.count".format(client.project, dataset), current_rows)
    create_pipeline_run()

    else:
    return f"No BigQuery data given"
  10. sararob revised this gist Jul 20, 2021. No changes.
  11. sararob revised this gist Jul 20, 2021. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -35,8 +35,8 @@ def create_count_table(table_id, num_rows):
    insert_bq_data(table_id, num_rows)

    def create_pipeline_run():

    print('Kicking off a pipeline run...')

    REGION = "us-central1" # Change this to the region you want to run in
    api_client = AIPlatformClient(
    project_id=client.project,
    @@ -53,8 +53,8 @@ def create_pipeline_run():
    print("Error trying to run the pipeline")
    raise

    # This should be the entrypoint for your Cloud Function
    def check_table_size(request):

    request = request.get_data()

    try:
    @@ -74,7 +74,7 @@ def check_table_size(request):
    current_rows = data_table.num_rows
    print("{} table has {} rows".format(table, current_rows))

    # See if `count` table exists
    # See if `count` table exists in dataset
    try:
    count_table = client.get_table("{}.{}.count".format(client.project, dataset))
    print("Count table exists, querying to see how many rows at last pipeline run")
  12. sararob revised this gist Jul 20, 2021. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -16,7 +16,7 @@ def insert_bq_data(table_id, num_rows):
    {u"num_rows_last_retraining": num_rows, u"last_retrain_time": time.time()}
    ]

    errors = client.insert_rows_json(table_id, rows_to_insert).
    errors = client.insert_rows_json(table_id, rows_to_insert)
    if errors == []:
    print("New rows have been added.")
    else:
  13. sararob revised this gist Jul 20, 2021. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -53,7 +53,7 @@ def create_pipeline_run():
    print("Error trying to run the pipeline")
    raise

    def entrypoint_func(request):
    def check_table_size(request):

    request = request.get_data()

  14. sararob revised this gist Jul 20, 2021. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -53,7 +53,7 @@ def create_pipeline_run():
    print("Error trying to run the pipeline")
    raise

    def hello_world(request):
    def entrypoint_func(request):

    request = request.get_data()

  15. sararob revised this gist Jul 20, 2021. 1 changed file with 3 additions and 8 deletions.
    11 changes: 3 additions & 8 deletions pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -30,17 +30,13 @@ def create_count_table(table_id, num_rows):

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)
    print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
    )
    print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))

    insert_bq_data(table_id, num_rows)



    def create_pipeline_run():

    print('Kicking off a pipeline run')

    print('Kicking off a pipeline run...')
    REGION = "us-central1" # Change this to the region you want to run in
    api_client = AIPlatformClient(
    project_id=client.project,
    @@ -61,7 +57,6 @@ def hello_world(request):

    request = request.get_data()


    try:
    request_json = json.loads(request.decode())
    except ValueError as e:
  16. sararob created this gist Jul 20, 2021.
    114 changes: 114 additions & 0 deletions pipeline-runner.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,114 @@
    import kfp
    import json
    import time

    from google.cloud import bigquery
    from google.cloud.exceptions import NotFound
    from google.oauth2 import service_account
    from kfp.v2.google.client import AIPlatformClient

    client = bigquery.Client()

    RETRAIN_THRESHOLD = 1000 # Change this based on your use case

    def insert_bq_data(table_id, num_rows):
    rows_to_insert = [
    {u"num_rows_last_retraining": num_rows, u"last_retrain_time": time.time()}
    ]

    errors = client.insert_rows_json(table_id, rows_to_insert).
    if errors == []:
    print("New rows have been added.")
    else:
    print("Encountered errors while inserting rows: {}".format(errors))

    def create_count_table(table_id, num_rows):
    schema = [
    bigquery.SchemaField("num_rows_last_retraining", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("last_retrain_time", "TIMESTAMP", mode="REQUIRED")
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)
    print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
    )

    insert_bq_data(table_id, num_rows)


    def create_pipeline_run():

    print('Kicking off a pipeline run')

    REGION = "us-central1" # Change this to the region you want to run in
    api_client = AIPlatformClient(
    project_id=client.project,
    region=REGION,
    )
    try:
    response = api_client.create_run_from_job_spec(
    "compiled_pipeline.json",
    pipeline_root="gs://your-gcs-bucket/pipeline_root/",
    parameter_values={"project": client.project, "display_name": "pipeline_gcf_trigger"}
    )
    return response
    except:
    print("Error trying to run the pipeline")
    raise

    def hello_world(request):

    request = request.get_data()


    try:
    request_json = json.loads(request.decode())
    except ValueError as e:
    print(f"Error decoding JSON: {e}")
    return "JSON Error", 400

    print(request_json)
    if request_json and 'bq_dataset' in request_json:

    dataset = request_json['bq_dataset']
    table = request_json['bq_table']

    # Get size of data table
    data_table = client.get_table("{}.{}.{}".format(client.project, dataset, table))
    current_rows = data_table.num_rows
    print("{} table has {} rows".format(table, current_rows))

    # See if `count` table exists
    try:
    count_table = client.get_table("{}.{}.count".format(client.project, dataset))
    print("Count table exists, querying to see how many rows at last pipeline run")

    # See how many rows have been added since the last retrain
    query_job = client.query(
    """
    SELECT num_rows_last_retraining FROM `sara-vertex-demos.beans.count`
    ORDER BY last_retrain_time DESC
    LIMIT 1"""
    )

    results = query_job.result()
    for i in results:
    last_retrain_count = i[0]

    rows_added_since_last_pipeline_run = current_rows - last_retrain_count
    print("{} rows have been added since we last ran the pipeline".format(rows_added_since_last_pipeline_run))

    if (rows_added_since_last_pipeline_run >= RETRAIN_THRESHOLD):
    pipeline_result = create_pipeline_run()

    # Update the BQ table
    insert_bq_data("{}.{}.count".format(client.project, dataset), current_rows)

    except NotFound:
    print("No count table found, creating one...")
    create_count_table("{}.{}.count".format(client.project, dataset), current_rows)
    create_pipeline_run()

    else:
    return f"No BigQuery data given"