""" # cloud_task_delete_old_appengine_versions.py This is a python file designed to operate as a Cloud Function. On trigger, it will delete all but the number of non-active versions equal to `RETAIN_LAST_N_VERSIONS` below (default 5). Note: failure to delete will not raise an exception. If you're running into bugs, see the commented line below to help debug if the delete call is throwing errors - entrypoint: cloud_build_complete - recommended trigger: Cloud Pub/Sub cloud builds topic - runtime: Python 3.7 ## requirements.txt requests>=2.23.0 """ import base64 import requests import json RETAIN_LAST_N_VERSIONS = 5 def list_versions(access_token): url = 'https://appengine.googleapis.com/v1/apps/trophy-gg/services/default/versions' headers = { 'Authorization': 'Bearer {}'.format(access_token) } r = requests.get(url, headers=headers) r.raise_for_status() return r.json()['versions'] def version_delete(name, access_token): url = 'https://appengine.googleapis.com/v1/{}' headers = { 'Authorization': 'Bearer {}'.format(access_token) } r = requests.delete(url.format(name), headers=headers) # Uncomment below if debugging # r.raise_for_status() return r.json() def cloud_build_complete(event, context): """Triggered from a message on a Cloud Pub/Sub topic. Args: event (dict): Event payload. context (google.cloud.functions.Context): Metadata for the event. """ pubsub_message = base64.b64decode(event['data']).decode('utf-8') data = {} if isinstance(pubsub_message, str): try: data = json.loads(pubsub_message) except json.JSONDecodeError: return if data.get('status') != 'SUCCESS' or data.get('buildTriggerId') != '9683f557-71da-4daf-a00a-85924bb34afb': return print('Deleting old backend versions...') scope = 'https://www.googleapis.com/auth/appengine.admin,https://www.googleapis.com/auth/cloud-platform' r = requests.get("http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token", params={ 'scopes': scope }, headers={ 'Metadata-Flavor': 'Google' }) r.raise_for_status() access_token = r.json()['access_token'] versions = list_versions(access_token) if len(versions) <= RETAIN_LAST_N_VERSIONS+1: return for version in versions[RETAIN_LAST_N_VERSIONS+1:]: version_delete(version['name'], access_token)