### Cross-IAM Invalidate CloudFront from a central pipeline account ### ### new codepipelineActions.LambdaInvokeAction({ ### runOrder: 3, ### actionName: 'InvalidateCloudFront', ### role: this.devOpsPipelineRole, ### lambda: Function.fromFunctionArn(this, "test-cloudfrontInvalidator", "arn:aws:lambda:eu-west-1::function:pipelineInvalidateCloudFront"), ### userParameters: { ### bucketName: bucket.bucketName, ### targetPipelineRole: this.testPipelineRole.roleArn ### } from __future__ import print_function import json import time import urllib import boto3 import botocore from boto3.session import Session cloudfront_client = boto3.client('cloudfront') codepipeline_client = boto3.client('codepipeline') def handler(event, context): ''' Creates a cloudfront invalidation for content added to an S3 bucket ''' # Log the the received event locally. # print("Received event: " + json.dumps(event, indent=2)) # Extract the Job ID job_id = event['CodePipeline.job']['id'] # Extract the Job Data job_data = event['CodePipeline.job']['data'] # Extract the params params = get_user_params(job_data) # Get the object from the event. bucket = params['bucketName'] cloudfront_client = get_cloudfront_client(params['targetPipelineRole']) cf_distro_id = get_cloudfront_distribution_id( cloudfront_client, bucket ) if cf_distro_id: print("Creating invalidation from {} for Cloudfront distribution {}".format( bucket, cf_distro_id)) try: invalidation = cloudfront_client.create_invalidation( DistributionId=cf_distro_id, InvalidationBatch={ 'Paths': { 'Quantity': 1, 'Items': ['/*'] }, 'CallerReference': str(time.time()) }) put_job_success( job_id, "Submitted invalidation ID {}".format( invalidation['Invalidation']['Id']) ) except Exception as e: print("Error processing event from bucket {}. Event {}".format( bucket, json.dumps(event, indent=2))) raise e else: print("Bucket {} does not appeaer to be an origin for a Cloudfront distribution".format(bucket)) put_job_failure( job_id, "Bucket does not to have an origin for a CF distribution" ) return 'Success' def get_cloudfront_client(role_arn): # create an STS client object that represents a live connection to the STS service sts_client = boto3.client('sts') # Call the assume_role method of the STSConnection object and pass the role # ARN and a role session name. assumed_role_object = sts_client.assume_role( RoleArn=role_arn, RoleSessionName="TargetAccountPipelineRoleSession" ) # From the response that contains the assumed role, get the temporary # credentials that can be used to make subsequent API calls credentials = assumed_role_object['Credentials'] # Get a session for the assumed role session = Session(aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken']) # return a CodePipeline client with the assumed role return session.client('cloudfront', config=botocore.client.Config(signature_version='s3v4')) def put_job_success(job, message): """Notify CodePipeline of a successful job Args: job: The CodePipeline job ID message: A message to be logged relating to the job status Raises: Exception: Any exception thrown by .put_job_success_result() """ print('Putting job success') print(message) codepipeline_client.put_job_success_result(jobId=job) def put_job_failure(job, message): """Notify CodePipeline of a failed job Args: job: The CodePipeline job ID message: A message to be logged relating to the job status Raises: Exception: Any exception thrown by .put_job_failure_result() """ print('Putting job failure') print(message) codepipeline_client.put_job_failure_result( jobId=job, failureDetails={ 'message': message, 'type': 'JobFailed' } ) def get_cloudfront_distribution_id(client, bucket): cf_distro_id = None # Create a reusable Paginator paginator = client.get_paginator('list_distributions') # Create a PageIterator from the Paginator page_iterator = paginator.paginate() print("Looking for origin: {}".format(bucket)) for page in page_iterator: for distribution in page['DistributionList']['Items']: for cf_origin in distribution['Origins']['Items']: # print("Origin {}".format(cf_origin['DomainName'])) if bucket == cf_origin['DomainName']: print("Found it!") cf_distro_id = distribution['Id'] return cf_distro_id def get_user_params(job_data): """Decodes the JSON user parameters and validates the required properties. Args: job_data: The job data structure containing the UserParameters string which should be a valid JSON structure Returns: The JSON parameters decoded as a dictionary. Raises: Exception: The JSON can't be decoded or a property is missing. """ try: # Get the user parameters which contain the stack, artifact and file settings user_parameters = job_data['actionConfiguration']['configuration']['UserParameters'] decoded_parameters = json.loads(user_parameters) except Exception as e: # We're expecting the user parameters to be encoded as JSON # so we can pass multiple values. If the JSON can't be decoded # then fail the job with a helpful message. raise Exception('UserParameters could not be decoded from JSON') return decoded_parameters