Skip to content

Instantly share code, notes, and snippets.

@rantoniuk
Last active February 14, 2022 16:08
Show Gist options
  • Save rantoniuk/c3d0dae4e499d0c11f0c756f6c1da73c to your computer and use it in GitHub Desktop.
Save rantoniuk/c3d0dae4e499d0c11f0c756f6c1da73c to your computer and use it in GitHub Desktop.

Revisions

  1. rantoniuk revised this gist Feb 14, 2022. 1 changed file with 29 additions and 0 deletions.
    29 changes: 29 additions & 0 deletions update-lambda-envs.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,29 @@
    # This updates AWS Lambda's environment variable values since it is currently not possible to use
    # it directly from SSM as reference to trigget deployment changes

    import boto3

    client = boto3.client('lambda', region_name='eu-west-1')

    paginator = client.get_paginator('list_functions')
    page_iterator = paginator.paginate()

    for page in page_iterator:
    for function in page['Functions']:
    if "Environment" in function and "DT_TENANT" in function['Environment']['Variables']:
    current_function = function['FunctionName']
    print(current_function, end='')

    current_env = client.get_function_configuration(
    FunctionName=current_function)

    new_env = {}
    new_env['Variables'] = current_env['Environment']['Variables']
    new_env['Variables']['DT_TENANT'] = ...

    response = client.update_function_configuration(
    FunctionName=current_function,
    Environment=new_env
    )
    print(" changed.")

  2. @warden warden revised this gist Jun 23, 2021. 1 changed file with 6 additions and 14 deletions.
    20 changes: 6 additions & 14 deletions cloudfront-invalidation-lambda.py
    Original file line number Diff line number Diff line change
    @@ -23,15 +23,13 @@
    cloudfront_client = boto3.client('cloudfront')
    codepipeline_client = boto3.client('codepipeline')

    # --------------- Main handler ------------------


    def handler(event, context):
    '''
    Creates a cloudfront invalidation for content added to an S3 bucket
    '''
    # Log the the received event locally.
    print("Received event: " + json.dumps(event, indent=2))
    # print("Received event: " + json.dumps(event, indent=2))

    # Extract the Job ID
    job_id = event['CodePipeline.job']['id']
    @@ -45,13 +43,10 @@ def handler(event, context):
    # Get the object from the event.
    bucket = params['bucketName']

    bucket_origin = bucket + '.s3.' + \
    codepipeline_client.meta.region_name + '.amazonaws.com'

    cloudfront_client = get_cloudfront_client(params['targetPipelineRole'])

    cf_distro_id = get_cloudfront_distribution_id(
    cloudfront_client, bucket_origin
    cloudfront_client, bucket
    )

    if cf_distro_id:
    @@ -69,9 +64,6 @@ def handler(event, context):
    'CallerReference': str(time.time())
    })

    print("Submitted invalidation. ID {}".format(
    invalidation['Invalidation']['Id']))

    put_job_success(
    job_id,
    "Submitted invalidation ID {}".format(
    @@ -154,7 +146,7 @@ def put_job_failure(job, message):
    )


    def get_cloudfront_distribution_id(client, bucket_origin):
    def get_cloudfront_distribution_id(client, bucket):
    cf_distro_id = None

    # Create a reusable Paginator
    @@ -163,13 +155,13 @@ def get_cloudfront_distribution_id(client, bucket_origin):
    # Create a PageIterator from the Paginator
    page_iterator = paginator.paginate()

    print("Looking for origin: {}".format(bucket_origin))
    print("Looking for origin: {}".format(bucket))

    for page in page_iterator:
    for distribution in page['DistributionList']['Items']:
    for cf_origin in distribution['Origins']['Items']:
    print("Origin {}".format(cf_origin['DomainName']))
    if bucket_origin == cf_origin['DomainName']:
    # print("Origin {}".format(cf_origin['DomainName']))
    if bucket == cf_origin['DomainName']:
    print("Found it!")
    cf_distro_id = distribution['Id']

  3. @warden warden revised this gist Jun 22, 2021. 1 changed file with 12 additions and 0 deletions.
    12 changes: 12 additions & 0 deletions cloudfront-invalidation-lambda.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,15 @@
    ### Cross-IAM Invalidate CloudFront from a central pipeline account
    ###
    ### new codepipelineActions.LambdaInvokeAction({
    ### runOrder: 3,
    ### actionName: 'InvalidateCloudFront',
    ### role: this.devOpsPipelineRole,
    ### lambda: Function.fromFunctionArn(this, "test-cloudfrontInvalidator", "arn:aws:lambda:eu-west-1::function:pipelineInvalidateCloudFront"),
    ### userParameters: {
    ### bucketName: bucket.bucketName,
    ### targetPipelineRole: this.testPipelineRole.roleArn
    ### }

    from __future__ import print_function

    import json
  4. @warden warden revised this gist Jun 22, 2021. 1 changed file with 90 additions and 54 deletions.
    144 changes: 90 additions & 54 deletions cloudfront-invalidation-lambda.py
    Original file line number Diff line number Diff line change
    @@ -5,56 +5,11 @@
    import urllib

    import boto3
    import botocore
    from boto3.session import Session

    cloudfront_client = boto3.client('cloudfront')
    ec2_client = boto3.client('ec2') # just to get the proper region
    code_pipeline = boto3.client('codepipeline')


    def get_cloudfront_distribution_id(bucket_origin):
    cf_distro_id = None

    # Create a reusable Paginator
    paginator = cloudfront_client.get_paginator('list_distributions')

    # Create a PageIterator from the Paginator
    page_iterator = paginator.paginate()

    for page in page_iterator:
    for distribution in page['DistributionList']['Items']:
    for cf_origin in distribution['Origins']['Items']:
    # print("Origin {}".format(cf_origin['DomainName']))
    if bucket_origin == cf_origin['DomainName']:
    cf_distro_id = distribution['Id']

    return cf_distro_id


    def get_user_params(job_data):
    """Decodes the JSON user parameters and validates the required properties.
    Args:
    job_data: The job data structure containing the UserParameters string which should be a valid JSON structure
    Returns:
    The JSON parameters decoded as a dictionary.
    Raises:
    Exception: The JSON can't be decoded or a property is missing.
    """
    try:
    # Get the user parameters which contain the stack, artifact and file settings
    user_parameters = job_data['actionConfiguration']['configuration']['UserParameters']
    decoded_parameters = json.loads(user_parameters)

    except Exception as e:
    # We're expecting the user parameters to be encoded as JSON
    # so we can pass multiple values. If the JSON can't be decoded
    # then fail the job with a helpful message.
    raise Exception('UserParameters could not be decoded from JSON')

    return decoded_parameters
    codepipeline_client = boto3.client('codepipeline')

    # --------------- Main handler ------------------

    @@ -64,7 +19,7 @@ def handler(event, context):
    Creates a cloudfront invalidation for content added to an S3 bucket
    '''
    # Log the the received event locally.
    # print("Received event: " + json.dumps(event, indent=2))
    print("Received event: " + json.dumps(event, indent=2))

    # Extract the Job ID
    job_id = event['CodePipeline.job']['id']
    @@ -76,11 +31,16 @@ def handler(event, context):
    params = get_user_params(job_data)

    # Get the object from the event.
    bucket = params['originBucket']
    bucket = params['bucketName']

    bucket_origin = bucket + '.s3.' + \
    codepipeline_client.meta.region_name + '.amazonaws.com'

    bucket_origin = bucket + '.s3.' + ec2_client.meta.region_name + '.amazonaws.com'
    cloudfront_client = get_cloudfront_client(params['targetPipelineRole'])

    cf_distro_id = get_cloudfront_distribution_id(bucket_origin)
    cf_distro_id = get_cloudfront_distribution_id(
    cloudfront_client, bucket_origin
    )

    if cf_distro_id:
    print("Creating invalidation from {} for Cloudfront distribution {}".format(
    @@ -99,6 +59,7 @@ def handler(event, context):

    print("Submitted invalidation. ID {}".format(
    invalidation['Invalidation']['Id']))

    put_job_success(
    job_id,
    "Submitted invalidation ID {}".format(
    @@ -118,6 +79,30 @@ def handler(event, context):
    return 'Success'


    def get_cloudfront_client(role_arn):
    # create an STS client object that represents a live connection to the STS service
    sts_client = boto3.client('sts')

    # Call the assume_role method of the STSConnection object and pass the role
    # ARN and a role session name.
    assumed_role_object = sts_client.assume_role(
    RoleArn=role_arn,
    RoleSessionName="TargetAccountPipelineRoleSession"
    )

    # From the response that contains the assumed role, get the temporary
    # credentials that can be used to make subsequent API calls
    credentials = assumed_role_object['Credentials']

    # Get a session for the assumed role
    session = Session(aws_access_key_id=credentials['AccessKeyId'],
    aws_secret_access_key=credentials['SecretAccessKey'],
    aws_session_token=credentials['SessionToken'])

    # return a CodePipeline client with the assumed role
    return session.client('cloudfront', config=botocore.client.Config(signature_version='s3v4'))


    def put_job_success(job, message):
    """Notify CodePipeline of a successful job
    @@ -131,7 +116,8 @@ def put_job_success(job, message):
    """
    print('Putting job success')
    print(message)
    code_pipeline.put_job_success_result(jobId=job)

    codepipeline_client.put_job_success_result(jobId=job)


    def put_job_failure(job, message):
    @@ -147,9 +133,59 @@ def put_job_failure(job, message):
    """
    print('Putting job failure')
    print(message)
    code_pipeline.put_job_failure_result(

    codepipeline_client.put_job_failure_result(
    jobId=job, failureDetails={
    'message': message,
    'type': 'JobFailed'
    }
    )


    def get_cloudfront_distribution_id(client, bucket_origin):
    cf_distro_id = None

    # Create a reusable Paginator
    paginator = client.get_paginator('list_distributions')

    # Create a PageIterator from the Paginator
    page_iterator = paginator.paginate()

    print("Looking for origin: {}".format(bucket_origin))

    for page in page_iterator:
    for distribution in page['DistributionList']['Items']:
    for cf_origin in distribution['Origins']['Items']:
    print("Origin {}".format(cf_origin['DomainName']))
    if bucket_origin == cf_origin['DomainName']:
    print("Found it!")
    cf_distro_id = distribution['Id']

    return cf_distro_id


    def get_user_params(job_data):
    """Decodes the JSON user parameters and validates the required properties.
    Args:
    job_data: The job data structure containing the UserParameters string which should be a valid JSON structure
    Returns:
    The JSON parameters decoded as a dictionary.
    Raises:
    Exception: The JSON can't be decoded or a property is missing.
    """
    try:
    # Get the user parameters which contain the stack, artifact and file settings
    user_parameters = job_data['actionConfiguration']['configuration']['UserParameters']
    decoded_parameters = json.loads(user_parameters)

    except Exception as e:
    # We're expecting the user parameters to be encoded as JSON
    # so we can pass multiple values. If the JSON can't be decoded
    # then fail the job with a helpful message.
    raise Exception('UserParameters could not be decoded from JSON')

    return decoded_parameters
  5. @warden warden created this gist Jun 18, 2021.
    155 changes: 155 additions & 0 deletions cloudfront-invalidation-lambda.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,155 @@
    from __future__ import print_function

    import json
    import time
    import urllib

    import boto3

    cloudfront_client = boto3.client('cloudfront')
    ec2_client = boto3.client('ec2') # just to get the proper region
    code_pipeline = boto3.client('codepipeline')


    def get_cloudfront_distribution_id(bucket_origin):
    cf_distro_id = None

    # Create a reusable Paginator
    paginator = cloudfront_client.get_paginator('list_distributions')

    # Create a PageIterator from the Paginator
    page_iterator = paginator.paginate()

    for page in page_iterator:
    for distribution in page['DistributionList']['Items']:
    for cf_origin in distribution['Origins']['Items']:
    # print("Origin {}".format(cf_origin['DomainName']))
    if bucket_origin == cf_origin['DomainName']:
    cf_distro_id = distribution['Id']

    return cf_distro_id


    def get_user_params(job_data):
    """Decodes the JSON user parameters and validates the required properties.
    Args:
    job_data: The job data structure containing the UserParameters string which should be a valid JSON structure
    Returns:
    The JSON parameters decoded as a dictionary.
    Raises:
    Exception: The JSON can't be decoded or a property is missing.
    """
    try:
    # Get the user parameters which contain the stack, artifact and file settings
    user_parameters = job_data['actionConfiguration']['configuration']['UserParameters']
    decoded_parameters = json.loads(user_parameters)

    except Exception as e:
    # We're expecting the user parameters to be encoded as JSON
    # so we can pass multiple values. If the JSON can't be decoded
    # then fail the job with a helpful message.
    raise Exception('UserParameters could not be decoded from JSON')

    return decoded_parameters

    # --------------- Main handler ------------------


    def handler(event, context):
    '''
    Creates a cloudfront invalidation for content added to an S3 bucket
    '''
    # Log the the received event locally.
    # print("Received event: " + json.dumps(event, indent=2))

    # Extract the Job ID
    job_id = event['CodePipeline.job']['id']

    # Extract the Job Data
    job_data = event['CodePipeline.job']['data']

    # Extract the params
    params = get_user_params(job_data)

    # Get the object from the event.
    bucket = params['originBucket']

    bucket_origin = bucket + '.s3.' + ec2_client.meta.region_name + '.amazonaws.com'

    cf_distro_id = get_cloudfront_distribution_id(bucket_origin)

    if cf_distro_id:
    print("Creating invalidation from {} for Cloudfront distribution {}".format(
    bucket, cf_distro_id))

    try:
    invalidation = cloudfront_client.create_invalidation(
    DistributionId=cf_distro_id,
    InvalidationBatch={
    'Paths': {
    'Quantity': 1,
    'Items': ['/*']
    },
    'CallerReference': str(time.time())
    })

    print("Submitted invalidation. ID {}".format(
    invalidation['Invalidation']['Id']))
    put_job_success(
    job_id,
    "Submitted invalidation ID {}".format(
    invalidation['Invalidation']['Id'])
    )
    except Exception as e:
    print("Error processing event from bucket {}. Event {}".format(
    bucket, json.dumps(event, indent=2)))
    raise e
    else:
    print("Bucket {} does not appeaer to be an origin for a Cloudfront distribution".format(bucket))
    put_job_failure(
    job_id,
    "Bucket does not to have an origin for a CF distribution"
    )

    return 'Success'


    def put_job_success(job, message):
    """Notify CodePipeline of a successful job
    Args:
    job: The CodePipeline job ID
    message: A message to be logged relating to the job status
    Raises:
    Exception: Any exception thrown by .put_job_success_result()
    """
    print('Putting job success')
    print(message)
    code_pipeline.put_job_success_result(jobId=job)


    def put_job_failure(job, message):
    """Notify CodePipeline of a failed job
    Args:
    job: The CodePipeline job ID
    message: A message to be logged relating to the job status
    Raises:
    Exception: Any exception thrown by .put_job_failure_result()
    """
    print('Putting job failure')
    print(message)
    code_pipeline.put_job_failure_result(
    jobId=job, failureDetails={
    'message': message,
    'type': 'JobFailed'
    }
    )