Last active
February 14, 2022 16:08
-
-
Save rantoniuk/c3d0dae4e499d0c11f0c756f6c1da73c to your computer and use it in GitHub Desktop.
Revisions
-
rantoniuk revised this gist
Feb 14, 2022 . 1 changed file with 29 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,29 @@ # This updates AWS Lambda's environment variable values since it is currently not possible to use # it directly from SSM as reference to trigget deployment changes import boto3 client = boto3.client('lambda', region_name='eu-west-1') paginator = client.get_paginator('list_functions') page_iterator = paginator.paginate() for page in page_iterator: for function in page['Functions']: if "Environment" in function and "DT_TENANT" in function['Environment']['Variables']: current_function = function['FunctionName'] print(current_function, end='') current_env = client.get_function_configuration( FunctionName=current_function) new_env = {} new_env['Variables'] = current_env['Environment']['Variables'] new_env['Variables']['DT_TENANT'] = ... response = client.update_function_configuration( FunctionName=current_function, Environment=new_env ) print(" changed.") -
warden revised this gist
Jun 23, 2021 . 1 changed file with 6 additions and 14 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -23,15 +23,13 @@ cloudfront_client = boto3.client('cloudfront') codepipeline_client = boto3.client('codepipeline') def handler(event, context): ''' Creates a cloudfront invalidation for content added to an S3 bucket ''' # Log the the received event locally. # print("Received event: " + json.dumps(event, indent=2)) # Extract the Job ID job_id = event['CodePipeline.job']['id'] @@ -45,13 +43,10 @@ def handler(event, context): # Get the object from the event. bucket = params['bucketName'] cloudfront_client = get_cloudfront_client(params['targetPipelineRole']) cf_distro_id = get_cloudfront_distribution_id( cloudfront_client, bucket ) if cf_distro_id: @@ -69,9 +64,6 @@ def handler(event, context): 'CallerReference': str(time.time()) }) put_job_success( job_id, "Submitted invalidation ID {}".format( @@ -154,7 +146,7 @@ def put_job_failure(job, message): ) def get_cloudfront_distribution_id(client, bucket): cf_distro_id = None # Create a reusable Paginator @@ -163,13 +155,13 @@ def get_cloudfront_distribution_id(client, bucket_origin): # Create a PageIterator from the Paginator page_iterator = paginator.paginate() print("Looking for origin: {}".format(bucket)) for page in page_iterator: for distribution in page['DistributionList']['Items']: for cf_origin in distribution['Origins']['Items']: # print("Origin {}".format(cf_origin['DomainName'])) if bucket == cf_origin['DomainName']: print("Found it!") cf_distro_id = distribution['Id'] -
warden revised this gist
Jun 22, 2021 . 1 changed file with 12 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,15 @@ ### Cross-IAM Invalidate CloudFront from a central pipeline account ### ### new codepipelineActions.LambdaInvokeAction({ ### runOrder: 3, ### actionName: 'InvalidateCloudFront', ### role: this.devOpsPipelineRole, ### lambda: Function.fromFunctionArn(this, "test-cloudfrontInvalidator", "arn:aws:lambda:eu-west-1::function:pipelineInvalidateCloudFront"), ### userParameters: { ### bucketName: bucket.bucketName, ### targetPipelineRole: this.testPipelineRole.roleArn ### } from __future__ import print_function import json -
warden revised this gist
Jun 22, 2021 . 1 changed file with 90 additions and 54 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -5,56 +5,11 @@ import urllib import boto3 import botocore from boto3.session import Session cloudfront_client = boto3.client('cloudfront') codepipeline_client = boto3.client('codepipeline') # --------------- Main handler ------------------ @@ -64,7 +19,7 @@ def handler(event, context): Creates a cloudfront invalidation for content added to an S3 bucket ''' # Log the the received event locally. print("Received event: " + json.dumps(event, indent=2)) # Extract the Job ID job_id = event['CodePipeline.job']['id'] @@ -76,11 +31,16 @@ def handler(event, context): params = get_user_params(job_data) # Get the object from the event. bucket = params['bucketName'] bucket_origin = bucket + '.s3.' + \ codepipeline_client.meta.region_name + '.amazonaws.com' cloudfront_client = get_cloudfront_client(params['targetPipelineRole']) cf_distro_id = get_cloudfront_distribution_id( cloudfront_client, bucket_origin ) if cf_distro_id: print("Creating invalidation from {} for Cloudfront distribution {}".format( @@ -99,6 +59,7 @@ def handler(event, context): print("Submitted invalidation. ID {}".format( invalidation['Invalidation']['Id'])) put_job_success( job_id, "Submitted invalidation ID {}".format( @@ -118,6 +79,30 @@ def handler(event, context): return 'Success' def get_cloudfront_client(role_arn): # create an STS client object that represents a live connection to the STS service sts_client = boto3.client('sts') # Call the assume_role method of the STSConnection object and pass the role # ARN and a role session name. assumed_role_object = sts_client.assume_role( RoleArn=role_arn, RoleSessionName="TargetAccountPipelineRoleSession" ) # From the response that contains the assumed role, get the temporary # credentials that can be used to make subsequent API calls credentials = assumed_role_object['Credentials'] # Get a session for the assumed role session = Session(aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken']) # return a CodePipeline client with the assumed role return session.client('cloudfront', config=botocore.client.Config(signature_version='s3v4')) def put_job_success(job, message): """Notify CodePipeline of a successful job @@ -131,7 +116,8 @@ def put_job_success(job, message): """ print('Putting job success') print(message) codepipeline_client.put_job_success_result(jobId=job) def put_job_failure(job, message): @@ -147,9 +133,59 @@ def put_job_failure(job, message): """ print('Putting job failure') print(message) codepipeline_client.put_job_failure_result( jobId=job, failureDetails={ 'message': message, 'type': 'JobFailed' } ) def get_cloudfront_distribution_id(client, bucket_origin): cf_distro_id = None # Create a reusable Paginator paginator = client.get_paginator('list_distributions') # Create a PageIterator from the Paginator page_iterator = paginator.paginate() print("Looking for origin: {}".format(bucket_origin)) for page in page_iterator: for distribution in page['DistributionList']['Items']: for cf_origin in distribution['Origins']['Items']: print("Origin {}".format(cf_origin['DomainName'])) if bucket_origin == cf_origin['DomainName']: print("Found it!") cf_distro_id = distribution['Id'] return cf_distro_id def get_user_params(job_data): """Decodes the JSON user parameters and validates the required properties. Args: job_data: The job data structure containing the UserParameters string which should be a valid JSON structure Returns: The JSON parameters decoded as a dictionary. Raises: Exception: The JSON can't be decoded or a property is missing. """ try: # Get the user parameters which contain the stack, artifact and file settings user_parameters = job_data['actionConfiguration']['configuration']['UserParameters'] decoded_parameters = json.loads(user_parameters) except Exception as e: # We're expecting the user parameters to be encoded as JSON # so we can pass multiple values. If the JSON can't be decoded # then fail the job with a helpful message. raise Exception('UserParameters could not be decoded from JSON') return decoded_parameters -
warden created this gist
Jun 18, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,155 @@ from __future__ import print_function import json import time import urllib import boto3 cloudfront_client = boto3.client('cloudfront') ec2_client = boto3.client('ec2') # just to get the proper region code_pipeline = boto3.client('codepipeline') def get_cloudfront_distribution_id(bucket_origin): cf_distro_id = None # Create a reusable Paginator paginator = cloudfront_client.get_paginator('list_distributions') # Create a PageIterator from the Paginator page_iterator = paginator.paginate() for page in page_iterator: for distribution in page['DistributionList']['Items']: for cf_origin in distribution['Origins']['Items']: # print("Origin {}".format(cf_origin['DomainName'])) if bucket_origin == cf_origin['DomainName']: cf_distro_id = distribution['Id'] return cf_distro_id def get_user_params(job_data): """Decodes the JSON user parameters and validates the required properties. Args: job_data: The job data structure containing the UserParameters string which should be a valid JSON structure Returns: The JSON parameters decoded as a dictionary. Raises: Exception: The JSON can't be decoded or a property is missing. """ try: # Get the user parameters which contain the stack, artifact and file settings user_parameters = job_data['actionConfiguration']['configuration']['UserParameters'] decoded_parameters = json.loads(user_parameters) except Exception as e: # We're expecting the user parameters to be encoded as JSON # so we can pass multiple values. If the JSON can't be decoded # then fail the job with a helpful message. raise Exception('UserParameters could not be decoded from JSON') return decoded_parameters # --------------- Main handler ------------------ def handler(event, context): ''' Creates a cloudfront invalidation for content added to an S3 bucket ''' # Log the the received event locally. # print("Received event: " + json.dumps(event, indent=2)) # Extract the Job ID job_id = event['CodePipeline.job']['id'] # Extract the Job Data job_data = event['CodePipeline.job']['data'] # Extract the params params = get_user_params(job_data) # Get the object from the event. bucket = params['originBucket'] bucket_origin = bucket + '.s3.' + ec2_client.meta.region_name + '.amazonaws.com' cf_distro_id = get_cloudfront_distribution_id(bucket_origin) if cf_distro_id: print("Creating invalidation from {} for Cloudfront distribution {}".format( bucket, cf_distro_id)) try: invalidation = cloudfront_client.create_invalidation( DistributionId=cf_distro_id, InvalidationBatch={ 'Paths': { 'Quantity': 1, 'Items': ['/*'] }, 'CallerReference': str(time.time()) }) print("Submitted invalidation. ID {}".format( invalidation['Invalidation']['Id'])) put_job_success( job_id, "Submitted invalidation ID {}".format( invalidation['Invalidation']['Id']) ) except Exception as e: print("Error processing event from bucket {}. Event {}".format( bucket, json.dumps(event, indent=2))) raise e else: print("Bucket {} does not appeaer to be an origin for a Cloudfront distribution".format(bucket)) put_job_failure( job_id, "Bucket does not to have an origin for a CF distribution" ) return 'Success' def put_job_success(job, message): """Notify CodePipeline of a successful job Args: job: The CodePipeline job ID message: A message to be logged relating to the job status Raises: Exception: Any exception thrown by .put_job_success_result() """ print('Putting job success') print(message) code_pipeline.put_job_success_result(jobId=job) def put_job_failure(job, message): """Notify CodePipeline of a failed job Args: job: The CodePipeline job ID message: A message to be logged relating to the job status Raises: Exception: Any exception thrown by .put_job_failure_result() """ print('Putting job failure') print(message) code_pipeline.put_job_failure_result( jobId=job, failureDetails={ 'message': message, 'type': 'JobFailed' } )