import boto3 import json, yaml import random, string from time import sleep import os import shutil import glob import zipfile import logging import logging.config class Client: ''' A client to automate the deployment of a lambda function. Attributes: config (dict): Configurations. logger (logging.Logger) workdir (str): Working directory. random (str): A random string used as suffix for resource names. inst (boto3.resources.factory.ec2.Instance) lamname (str): Name of the Lambda function resource. role (boto3.resources.factory.iam.Role) rolename (str): Name of the IAM Role resource. bucket (boto3.resources.factory.s3.Bucket) bucketname (str): Name of the S3 bucket for code uploading. sg (boto3.resources.factory.ec2.SecurityGroup) sgname (str): Name of the security group for the EC2 instance. key (boto3.resources.factory.ec2.KeyPair| boto3.resources.factory.ec2.KeyPairInfo): When first created, `key` is of type `KeyPair`, where the private key can be saved. All subsequent uses of `key` is of type `KeyPairInfo`, where the private key is no longer accessible. keyname (str): Name of the key pair for the EC2 instance. keyfile (str): Path to the local private `.pem` key file. tagname (str): Names of the resource tag. ec2remote (str): Filename of the virtual environment package on EC2. ec2local (str): Filename of the downloaded `ec2remote`. s3local (str): Filename of the created deployment package locally. s3remote (str): Filename of the uploaded deployment package on S3. ''' def __init__(self, path): # Loads the configuration file. with open(path, 'r') as f: self.config = yaml.load(f) # Configures the logger. logging.config.dictConfig(self.config['logging']) self.logger = logging.getLogger('main') self.workdir = self.config['build']['workdir'] self.random = ''.join(random.choices(string.ascii_lowercase+string.digits, k=6)) self.inst = None self.lamname = self.config['lambda']['name'] # # Load resource names if specified. Otherwise, generate resource names. # Each name is of format [prefix-][lambda name-][random suffix], where # the prefix is defined in `config['prefix']`, the lambda function name # is defined as `config['lambda']['name']`, and the suffix is # `self.random`. # names = self.config.get('names') self.role = None if names and names.get('role'): self.rolename = names['role'] else: self.rolename = self.config['prefix']['role'] self.rolename += f'{self.lamname}-{self.random}' self.bucket = None if names and names.get('bucket'): self.bucketname = names['bucket'] else: self.bucketname = self.config['prefix']['bucket'] self.bucketname += f'{self.lamname}-{self.random}' self.sg = None if names and names.get('sg'): self.sgname = names['sg'] else: self.sgname = self.config['prefix']['sg'] self.sgname += f'{self.lamname}-{self.random}' self.key = None if names and names.get('key'): self.keyname = names['key'] else: self.keyname = self.config['prefix']['key'] self.keyname += f'{self.lamname}-{self.random}' self.keyfile = f'{self.workdir}/{self.keyname}.pem' if names and names.get('tag'): self.tagname = names['tag'] else: self.tagname = self.config['prefix']['tag'] self.tagname += f'{self.lamname}' # # Figures out the names for the zip files. There are two zip files. One # is the pure dependency zip file generated on EC2 and downloaded as # `self.ec2local`. The other is the true Lambda deployment package # combining both the dependencies and the Lambda source code itself, # saved as `self.s3local` and uploaded as `self.s3remote`. # basename = self.config['build']['basename'] self.ec2remote = f'{basename}.zip' self.ec2local = f"{self.workdir}/{basename}-ec2.zip" self.s3remote = f"{basename}.zip" self.s3local = f"{self.workdir}/{basename}.zip" def save_config(self, path): """ Saves the configuration files to `path`, including resource names. """ config = self.config.copy() if 'names' not in config: config['names'] = dict() config['names']['role'] = self.rolename config['names']['bucket'] = self.bucketname config['names']['sg'] = self.sgname config['names']['key'] = self.keyname config['names']['tag'] = self.tagname config['names']['ec2'] = self.inst.id if self.inst else None with open(path, 'w') as f: yaml.dump(config, f, default_flow_style=False) def tag_resources(self): """ Tags used resources. The following resources are tagged. - IAM role for running lambda function - S3 bucket hosting the code - Lambda function - EC2 instance for compiling the virtual environment - EC2 security group for SSH connection The following resources are not tagged since AWS does not support it. - EC2 key pair """ res = [] # Get IAM role ARN if self.role is not None: res.append(self.role.arn) # Get S3 bucket ARN if self.bucket is not None: res.append(f'arn:aws:s3:::{self.bucket.name}') # Get Lambda function ARN lam = boto3.client('lambda') for fn in lam.list_functions()['Functions']: if fn['FunctionName'] == self.lamname: res.append(fn['FunctionArn']) # Tag resources using their ARNs cli = boto3.client('resourcegroupstaggingapi') cli.tag_resources(ResourceARNList=res, Tags={self.tagname: ''}) res = [] # Get EC2 instance id. if self.inst is not None: res.append(self.inst.id) # Get EC2 security group id. if self.sg is not None: res.append(self.sg.id) # Tag resources using ids. cli = boto3.client('ec2') cli.create_tags(res, Tags=[{'Key':name, 'Value':''}]) def ensure_role(self): """ Ensures a lambda function's role. A lambda function need to have the ability to assume a role (AssumeRolePolicyDocument), and the role being assumed need to have the correct permission in order for the Lambda function to access other resources like CloudWatch logs and S3 buckets. """ debug = self.logger.debug debug('Checking IAM roles for the lambda function ...') if self.role is not None: return self.role # Checks existing roles. iam = boto3.resource('iam') for role in iam.roles.iterator(): if role.name == self.rolename: self.role = role return role # This is the standard policy allowing Lambda function to assume a role. basic_role = '\n'.join([ 'Version: "2012-10-17"', 'Statement:', ' - Effect: Allow', ' Principal:', ' Service: lambda.amazonaws.com', ' Action: sts:AssumeRole' ]) # Creates one. role = iam.create_role(RoleName=self.rolename, AssumeRolePolicyDocument=json.dumps(yaml.load(basic_role))) # Find the configured Policy ARN for the role. # This policy grants permissions for Lambda function to access # other resources. arn = self.config['role'][self.config['lambda']['policy']] # Attach the policy to the role, and wait for it to take effect. role.attach_policy(PolicyArn=arn) sleep(10) role.reload() debug(f'Role {self.rolename} created.') self.role = role return role def ensure_code_bucket(self): """ Ensures a S3 bucket for code uploading. """ debug = self.logger.debug debug('Checking S3 buckets for lambda function uploading ...') if self.bucket is not None: return self.bucket # Find if the bucket already exist. s3 = boto3.resource('s3') for bucket in s3.buckets.iterator(): if bucket.name == self.bucketname: self.bucket = bucket return bucket # Create a new one. bucket = s3.Bucket(self.bucketname) bucket.create() bucket.wait_until_exists() bucket.load() debug(f'Bucket {bucket.name} created.') self.bucket = bucket return bucket def ensure_default_vpc(self): """ Ensures a default VPC in EC2. Without a default VPC, creating EC2 instance becomes more involved. """ debug = self.logger.debug debug('Checking default VPC ...') ec2 = boto3.resource('ec2') if not any([vpc.is_default for vpc in ec2.vpcs.iterator()]): debug('Creating default VPC ...') client = boto3.client('ec2') client.create_default_vpc() def ensure_ec2_key(self): """ Ensures a key pair for the EC2 instance. Key pair allows key-based SSH authentication. When firstly created, we can use `key.key_material` to save the private key. When subsequently looking up the key using its name, we no longer have access to the private key. Therefore, it is important to ensure that we have the private key. If `self.keyfile` is lost, we need to remove all EC2 instances that uses the corresponding public key, since we can no longer log into those machines. Since these EC2 instances are disposable, we can safely terminate them. """ debug = self.logger.debug debug('Checking EC2 key pair ...') ec2 = boto3.resource('ec2') # We have the private key. if os.path.isfile(self.keyfile): # And we have the EC2 key resource as well if self.key is not None: return self.key # But we don't have the EC2 key resource else: # Look it up on EC2. if self.keyname in [k.name for k in ec2.key_pairs.iterator()]: key = ec2.KeyPair(self.keyname) key.load() self.key = key return key # We don't have the private key. else: # Look it up on EC2 insts = list(ec2.instances.iterator()) if self.keyname in map(lambda i: i.key_name, insts): debug(f'Private key {self.keyname} is lost.') insts = filter(lambda i: i.key_name == self.keyname, insts) for id in map(lambda i: i.id, insts): debug(f'Please terminate EC2 instance {id}.') raise RuntimeError(self.keyname) # Creates the key. os.makedirs(self.workdir, exist_ok=True) key = ec2.create_key_pair(KeyName=self.keyname) with open(self.keyfile, 'w') as f: f.write(str(key.key_material)) debug(f'Private key saved to {self.keyfile}.') self.key = key return key def ensure_ec2_instance(self): """ Ensures an EC2 instance for python packaging. The instance uses an image that is the same as Lambda execution environment. This way, after creating virtual environment on this instance, we are guaranteed that the compiled Python package cache is compatible with Lambda environment. The instance is only spawned up if we need to compile the dependent Python packages. For instance, if you only changed the Lambda function source without changing the dependency, you can reuse the zipped virtual environment from last time, without spawning a new EC2 instance. """ debug = self.logger.debug debug('Checking EC2 instance ...') if self.inst is not None: assert self.inst.key_name == self.keyname return self.inst # Choose the correct AMI # https://docs.aws.amazon.com/lambda/latest/dg/current-supported-versions.html ami = 'ami-4fffc834' # Look it up on EC2 ec2 = boto3.resource('ec2') for inst in ec2.instances.iterator(): if inst.image_id == ami and inst.key_name == self.keyname: self.inst = inst return inst # Creates a new instance if needed. inst = ec2.create_instances(InstanceType='t2.micro', KeyName=self.keyname, MaxCount=1, MinCount=1, ImageId=ami)[0] inst.wait_until_running() inst.reload() debug(f'EC2 instance {inst.id} created at {inst.public_dns_name}') self.inst = inst return inst def ensure_ec2_ssh(self): """ Ensures ssh rule for EC2 instance. Default security groups for a new instance does not allow TCP access to port 22, which is used by SSH. Therefore we need to check and potentially create a new security group, attach a rule allowing port 22 inbound access, and attach this security group to the instance. Only after SSH is enabled that we can execute shell commands like `pip install` to create a virtual environment. """ debug = self.logger.debug debug('Checking SSH security group rule for EC2 instance ...') ec2 = boto3.resource('ec2') # Find the security group if exist. if self.sg is None: for sg in ec2.security_groups.iterator(): if sg.group_name == self.sgname: self.sg = sg if self.sg is None: # Creates a new security group. sg = ec2.create_security_group(GroupName=self.sgname, VpcId=self.inst.vpc_id, Description='Allowing SSH') # Allowing port 22. sg.authorize_ingress(IpProtocol='tcp', FromPort=22, ToPort=22, CidrIp='0.0.0.0/0') sg.reload() self.sg = sg # Attach the security group to the instance. sgs = self.inst.security_groups sgs = map(lambda sg: ec2.SecurityGroup(sg['GroupName']), sgs) if self.sgname not in sgs: # Allows SSH. sgs = list(map(lambda sg: sg.id, sgs)) sgs.append(self.sg.id) self.inst.modify_attribute(Groups=sgs) self.inst.reload() sleep(10) debug(f'Security group {self.sgname} created and attached.') return sg def create_ec2_venv(self, packages): """ Creates a AWS Lambda compatible python virtual environment. Args: packages ([str]): A list of Python packages to be installed using `pip`. One can customize the logic here. For instance, extending this function to install packages of certain version. """ debug = self.logger.debug # SSH into the EC2. In case it fails, wait longer after # `self.ensure_ec2_ssh()` import fabric.connection conn = fabric.connection.Connection( self.inst.public_dns_name, user='ec2-user', connect_kwargs={'key_filename': self.keyfile}) # Executes shell commands debug(f'Sending commands to {self.inst.public_dns_name} ...') # Install Python and tool chain. commands = [ 'sudo yum -y update', 'sudo yum -y upgrade', 'sudo yum -y groupinstall "Development Tools"', 'sudo yum -y install blas', 'sudo yum -y install lapack', 'sudo yum -y install Cython', 'sudo yum -y install python36-devel python36-pip', 'sudo yum -y install python36-virtualenv gcc' ] # Create a virtual environment. toinstall = ' '.join(packages) commands += [ 'virtualenv -p python3 ./venv', 'source ./venv/bin/activate', './venv/bin/pip install --upgrade pip', f'./venv/bin/pip install {toinstall}' ] # Zip the virtual environment. tozip = ' '.join([f'./venv/lib64/python3.6/lib-dynload', f'./venv/lib64/python3.6/site-packages', f'./venv/lib64/python3.6/dist-packages', f'./venv/lib/python3.6/site-packages', f'./venv/lib/python3.6/dist-packages']) commands.append(f'zip -r9 ./{self.ec2remote} {tozip}') # Actually send the command. for cmd in commands: debug(f'> {cmd}') conn.run(cmd) # Download the zip file with compiled Python packages. debug(f'Downloading {self.ec2remote}') os.makedirs(self.workdir, exist_ok=True) conn.get(remote=self.ec2remote, local=self.ec2local) conn.close() debug(f'Downloaded to {self.ec2local}') def package_code(self, sources): """ Package source code into zipped virtual environment. """ debug = self.logger.debug shutil.copy(self.ec2local, self.s3local) with zipfile.ZipFile(self.s3local, 'a') as f: for s in sources: f.write(s, os.path.basename(s)) debug(f'Deployment package created at {self.s3local}.') def lambda_create(self, sources): """ Creates a lambda function. """ debug = self.logger.debug debug('Checking lambda function ...') # Look up the function first. cli = boto3.client('lambda') fns = map(lambda f: f['FunctionName'], cli.list_functions()['Functions']) if self.lamname in fns: debug('Lambda function already existed.') return cli.get_function(FunctionName=self.lamname) # Add our source code, and upload the package to S3 self.package_code(sources) self.bucket.upload_file(Filename=self.s3local, Key=self.s3remote) # Create the Lambda function from S3 bucket. ret = cli.create_function( FunctionName=self.lamname, Runtime='python3.6', Role=self.role.arn, Handler=self.config['lambda']['handler'], Timeout=self.config['lambda']['timeout'], Code={ 'S3Bucket': self.bucket.name, 'S3Key': self.s3remote }) debug(f'Lambda function {self.lamname} created.') return ret def lambda_update(self, sources): """ Updates a lambda function. """ debug = self.logger.debug debug('Checking lambda function ...') self.package_code(sources) self.bucket.upload_file(Filename=self.s3local, Key=self.s3remote) cli = boto3.client('lambda') ret = cli.update_function_code( FunctionName=self.lamname, S3Bucket=self.bucket.name, S3Key=self.s3remote) debug(f'Lambda function {self.lamname} updated.') return ret def lambda_invoke(self, event): """ Invokes the lambda function. """ lam = boto3.client('lambda') # 'RequestResponse' is good for use on the console. One can also # invoke it as 'Event'. See # https://docs.aws.amazon.com/lambda/latest/dg/API_Invoke.html resp = lam.invoke( FunctionName=self.lamname, InvocationType='RequestResponse', LogType='Tail', Payload=json.dumps(event)) print(resp['Payload'].read()) return resp def cleanup(self): """ Cleans resource except for the Lambda function. """ debug = self.logger.debug if self.bucket is not None: debug(f'Deleting bucket {self.bucketname} ...') self.bucket.objects.all().delete() self.bucket.delete() self.bucket.wait_until_not_exists() self.bucket = None if self.inst is not None: debug(f'Deleting instance {self.inst.id} ...') self.inst.terminate() self.inst.wait_until_terminated() self.inst = None if self.key is not None: debug(f'Deleting key {self.keyname} and {self.keyfile} ...') self.key.delete() self.key = None os.remove(self.keyfile) if self.sg is not None: debug(f'Deleting security group {self.sgname} ...') self.sg.delete() self.sg = None def cleanup_all(self): """ Cleans all the resources, including the Lambda function and its role. Note: The Lambda function can not run without the role. The role thus should live at least as long as the Lambda function. """ self.cleanup() debug = self.logger.debug debug(f'Deleting lambda function {self.lamname} ...') lam = boto3.client('lambda') lam.delete_function(FunctionName=self.lamname) if self.role is None: self.ensure_role() debug(f'Deleting role {self.rolename} ...') for policy in self.role.attached_policies.iterator(): self.role.detach_policy(PolicyArn=policy.arn) self.role.delete() self.role = None if __name__ == '__main__': # # Here is an example script deploying the function. Pay attention to the # order of function invocations. # c = Client('./config.yaml') c.ensure_role() # Before create lambda. c.ensure_code_bucket() # Before create lambda. c.ensure_ec2_key() # Before creating instance. c.ensure_ec2_instance() # Before allowing SSH. c.ensure_ec2_ssh() # Before creating venv. c.create_ec2_venv(c.config['lambda']['requirements']) # Before create lambda. c.lambda_create(c.config['lambda']['sources']) # Before invocation. c.lambda_invoke(c.config['test']['event']) c.save_config('./config_last.yaml') # After all resources are populated. # # Say we changed the Lambda code, and its dependencies, since we did not # clean up last time, those resources should still be there for reuse. # c = Client('./config_last.yaml') # These `ensure_*` functions should be reusing existing resources, as # specified in the `config['names']`. c.ensure_role() c.ensure_code_bucket() c.ensure_ec2_key() c.ensure_ec2_instance() c.ensure_ec2_ssh() # Since the dependency changed, we need to re-create a virtual environment. c.create_ec2_venv(c.config['lambda']['requirements']) # Update the function. c.lambda_update(c.config['lambda']['sources']) c.lambda_invoke(c.config['test']['event']) c.cleanup() # # Say this time we changed the Lambda code without changing its dependencies. # c = Client('./config_last.yaml') # A new bucket will be created, since the last one is cleaned up. c.ensure_code_bucket() # The zipped virtual environment at `self.ec2local` is re-used. No need to # spawn a new EC2 instance. c.lambda_update(c.config['lambda']['sources']) c.lambda_invoke(c.config['test']['event']) c.cleanup()