Last active
August 21, 2022 11:09
-
-
Save sararob/1eea7ae2b08e85851855ec2eff8c2d8b to your computer and use it in GitHub Desktop.
Revisions
-
sararob revised this gist
Sep 17, 2021 . No changes.There are no files selected for viewing
-
sararob revised this gist
Jul 21, 2021 . No changes.There are no files selected for viewing
-
sararob revised this gist
Jul 21, 2021 . No changes.There are no files selected for viewing
-
sararob revised this gist
Jul 20, 2021 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -80,7 +80,7 @@ def check_table_size(request): query_job = client.query( """ SELECT num_rows_last_retraining FROM `your-project.your-dataset.count` ORDER BY last_retrain_time DESC LIMIT 1""" ) -
sararob revised this gist
Jul 20, 2021 . 1 changed file with 0 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,10 +1,8 @@ # Copyright 2021 Google LLC. # SPDX-License-Identifier: Apache-2.0 import kfp import json import time from google.cloud import bigquery from google.cloud.exceptions import NotFound from kfp.v2.google.client import AIPlatformClient @@ -28,7 +26,6 @@ def create_count_table(table_id, num_rows): bigquery.SchemaField("num_rows_last_retraining", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("last_retrain_time", "TIMESTAMP", mode="REQUIRED") ] table = bigquery.Table(table_id, schema=schema) table = client.create_table(table) print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}") -
sararob revised this gist
Jul 20, 2021 . 1 changed file with 3 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,6 @@ # Copyright 2021 Google LLC. # SPDX-License-Identifier: Apache-2.0 import kfp import json import time -
sararob revised this gist
Jul 20, 2021 . 1 changed file with 18 additions and 19 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -74,27 +74,26 @@ def check_table_size(request): count_table = client.get_table(f"{client.project}.{dataset}.count") print("Count table exists, querying to see how many rows at last pipeline run") except NotFound: print("No count table found, creating one...") create_count_table(f"{client.project}.{dataset}.count", current_rows) query_job = client.query( """ SELECT num_rows_last_retraining FROM `sara-vertex-demos.beans.count` ORDER BY last_retrain_time DESC LIMIT 1""" ) results = query_job.result() for i in results: last_retrain_count = i[0] rows_added_since_last_pipeline_run = current_rows - last_retrain_count print(f"{rows_added_since_last_pipeline_run} rows have been added since we last ran the pipeline") if (rows_added_since_last_pipeline_run >= RETRAIN_THRESHOLD): pipeline_result = create_pipeline_run() insert_bq_data(f"{client.project}.{dataset}.count", current_rows) else: return f"No BigQuery data given" -
sararob revised this gist
Jul 20, 2021 . 1 changed file with 8 additions and 8 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -18,7 +18,7 @@ def insert_bq_data(table_id, num_rows): if errors == []: print("New rows have been added.") else: print(f"Encountered errors while inserting rows: {errors}") def create_count_table(table_id, num_rows): schema = [ @@ -28,7 +28,7 @@ def create_count_table(table_id, num_rows): table = bigquery.Table(table_id, schema=schema) table = client.create_table(table) print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}") insert_bq_data(table_id, num_rows) @@ -65,13 +65,13 @@ def check_table_size(request): dataset = request_json['bq_dataset'] table = request_json['bq_table'] data_table = client.get_table(f"{client.project}.{dataset}.{table}") current_rows = data_table.num_rows print(f"{table} table has {current_rows} rows") # See if `count` table exists in dataset try: count_table = client.get_table(f"{client.project}.{dataset}.count") print("Count table exists, querying to see how many rows at last pipeline run") query_job = client.query( @@ -86,15 +86,15 @@ def check_table_size(request): last_retrain_count = i[0] rows_added_since_last_pipeline_run = current_rows - last_retrain_count print(f"{rows_added_since_last_pipeline_run} rows have been added since we last ran the pipeline") if (rows_added_since_last_pipeline_run >= RETRAIN_THRESHOLD): pipeline_result = create_pipeline_run() insert_bq_data(f"{client.project}.{dataset}.count", current_rows) except NotFound: print("No count table found, creating one...") create_count_table(f"{client.project}.{dataset}.count", current_rows) create_pipeline_run() else: return f"No BigQuery data given" -
sararob revised this gist
Jul 20, 2021 . 1 changed file with 1 addition and 10 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -4,11 +4,9 @@ from google.cloud import bigquery from google.cloud.exceptions import NotFound from kfp.v2.google.client import AIPlatformClient client = bigquery.Client() RETRAIN_THRESHOLD = 1000 # Change this based on your use case def insert_bq_data(table_id, num_rows): @@ -62,14 +60,11 @@ def check_table_size(request): except ValueError as e: print(f"Error decoding JSON: {e}") return "JSON Error", 400 if request_json and 'bq_dataset' in request_json: dataset = request_json['bq_dataset'] table = request_json['bq_table'] data_table = client.get_table("{}.{}.{}".format(client.project, dataset, table)) current_rows = data_table.num_rows print("{} table has {} rows".format(table, current_rows)) @@ -79,7 +74,6 @@ def check_table_size(request): count_table = client.get_table("{}.{}.count".format(client.project, dataset)) print("Count table exists, querying to see how many rows at last pipeline run") query_job = client.query( """ SELECT num_rows_last_retraining FROM `sara-vertex-demos.beans.count` @@ -96,14 +90,11 @@ def check_table_size(request): if (rows_added_since_last_pipeline_run >= RETRAIN_THRESHOLD): pipeline_result = create_pipeline_run() insert_bq_data("{}.{}.count".format(client.project, dataset), current_rows) except NotFound: print("No count table found, creating one...") create_count_table("{}.{}.count".format(client.project, dataset), current_rows) create_pipeline_run() else: return f"No BigQuery data given" -
sararob revised this gist
Jul 20, 2021 . No changes.There are no files selected for viewing
-
sararob revised this gist
Jul 20, 2021 . 1 changed file with 3 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -35,8 +35,8 @@ def create_count_table(table_id, num_rows): insert_bq_data(table_id, num_rows) def create_pipeline_run(): print('Kicking off a pipeline run...') REGION = "us-central1" # Change this to the region you want to run in api_client = AIPlatformClient( project_id=client.project, @@ -53,8 +53,8 @@ def create_pipeline_run(): print("Error trying to run the pipeline") raise # This should be the entrypoint for your Cloud Function def check_table_size(request): request = request.get_data() try: @@ -74,7 +74,7 @@ def check_table_size(request): current_rows = data_table.num_rows print("{} table has {} rows".format(table, current_rows)) # See if `count` table exists in dataset try: count_table = client.get_table("{}.{}.count".format(client.project, dataset)) print("Count table exists, querying to see how many rows at last pipeline run") -
sararob revised this gist
Jul 20, 2021 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -16,7 +16,7 @@ def insert_bq_data(table_id, num_rows): {u"num_rows_last_retraining": num_rows, u"last_retrain_time": time.time()} ] errors = client.insert_rows_json(table_id, rows_to_insert) if errors == []: print("New rows have been added.") else: -
sararob revised this gist
Jul 20, 2021 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -53,7 +53,7 @@ def create_pipeline_run(): print("Error trying to run the pipeline") raise def check_table_size(request): request = request.get_data() -
sararob revised this gist
Jul 20, 2021 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -53,7 +53,7 @@ def create_pipeline_run(): print("Error trying to run the pipeline") raise def entrypoint_func(request): request = request.get_data() -
sararob revised this gist
Jul 20, 2021 . 1 changed file with 3 additions and 8 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -30,17 +30,13 @@ def create_count_table(table_id, num_rows): table = bigquery.Table(table_id, schema=schema) table = client.create_table(table) print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)) insert_bq_data(table_id, num_rows) def create_pipeline_run(): print('Kicking off a pipeline run...') REGION = "us-central1" # Change this to the region you want to run in api_client = AIPlatformClient( project_id=client.project, @@ -61,7 +57,6 @@ def hello_world(request): request = request.get_data() try: request_json = json.loads(request.decode()) except ValueError as e: -
sararob created this gist
Jul 20, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,114 @@ import kfp import json import time from google.cloud import bigquery from google.cloud.exceptions import NotFound from google.oauth2 import service_account from kfp.v2.google.client import AIPlatformClient client = bigquery.Client() RETRAIN_THRESHOLD = 1000 # Change this based on your use case def insert_bq_data(table_id, num_rows): rows_to_insert = [ {u"num_rows_last_retraining": num_rows, u"last_retrain_time": time.time()} ] errors = client.insert_rows_json(table_id, rows_to_insert). if errors == []: print("New rows have been added.") else: print("Encountered errors while inserting rows: {}".format(errors)) def create_count_table(table_id, num_rows): schema = [ bigquery.SchemaField("num_rows_last_retraining", "INTEGER", mode="REQUIRED"), bigquery.SchemaField("last_retrain_time", "TIMESTAMP", mode="REQUIRED") ] table = bigquery.Table(table_id, schema=schema) table = client.create_table(table) print( "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id) ) insert_bq_data(table_id, num_rows) def create_pipeline_run(): print('Kicking off a pipeline run') REGION = "us-central1" # Change this to the region you want to run in api_client = AIPlatformClient( project_id=client.project, region=REGION, ) try: response = api_client.create_run_from_job_spec( "compiled_pipeline.json", pipeline_root="gs://your-gcs-bucket/pipeline_root/", parameter_values={"project": client.project, "display_name": "pipeline_gcf_trigger"} ) return response except: print("Error trying to run the pipeline") raise def hello_world(request): request = request.get_data() try: request_json = json.loads(request.decode()) except ValueError as e: print(f"Error decoding JSON: {e}") return "JSON Error", 400 print(request_json) if request_json and 'bq_dataset' in request_json: dataset = request_json['bq_dataset'] table = request_json['bq_table'] # Get size of data table data_table = client.get_table("{}.{}.{}".format(client.project, dataset, table)) current_rows = data_table.num_rows print("{} table has {} rows".format(table, current_rows)) # See if `count` table exists try: count_table = client.get_table("{}.{}.count".format(client.project, dataset)) print("Count table exists, querying to see how many rows at last pipeline run") # See how many rows have been added since the last retrain query_job = client.query( """ SELECT num_rows_last_retraining FROM `sara-vertex-demos.beans.count` ORDER BY last_retrain_time DESC LIMIT 1""" ) results = query_job.result() for i in results: last_retrain_count = i[0] rows_added_since_last_pipeline_run = current_rows - last_retrain_count print("{} rows have been added since we last ran the pipeline".format(rows_added_since_last_pipeline_run)) if (rows_added_since_last_pipeline_run >= RETRAIN_THRESHOLD): pipeline_result = create_pipeline_run() # Update the BQ table insert_bq_data("{}.{}.count".format(client.project, dataset), current_rows) except NotFound: print("No count table found, creating one...") create_count_table("{}.{}.count".format(client.project, dataset), current_rows) create_pipeline_run() else: return f"No BigQuery data given"