from __future__ import print_function import boto3 import logging import json import sys from multiprocessing.pool import ThreadPool from contextlib import closing # # Logging # class LessThanFilter(logging.Filter): def __init__(self, exclusive_maximum, name=""): super(LessThanFilter, self).__init__(name) self.max_level = exclusive_maximum def filter(self, record): return int(record.levelno < self.max_level) log = logging.getLogger() log.setLevel(logging.INFO) # Log to stdout < WARNING level stdout = logging.StreamHandler(sys.stdout) stdout.addFilter(LessThanFilter(logging.WARNING)) stdout.setLevel(logging.DEBUG) # Log to stderr >= WARNING level stderr = logging.StreamHandler(sys.stderr) stderr.setLevel(logging.WARNING) log.addHandler(stdout) log.addHandler(stderr) # # Configuration # with open("config/functions.json") as input: config = json.load(input) region = config.get("region", "eu-west-1") # # Keep alive handler # def ping_function(func): try: name = func["name"] event = func["event"] payload = json.dumps(event) logging.info("Invoking function {}".format(name)) client = boto3.client('lambda', region_name=region) r = client.invoke(FunctionName=name, InvocationType="RequestResponse", Payload=payload) if r.get("StatusCode", 0) != 200: log.error("Failed to invoke {}, response: {}".format(name, r)) return 0 payload = json.loads(r["Payload"].read().decode("utf-8")) if payload.get("statusCode", 0) != 200: log.error("Invalid lambda {} response: {}".format(name, payload)) return 0 logging.info("Function {} invoked successfully".format(name)) return 1 except Exception as e: logging.error("Failed to invoke {}, exception: {}".format(func, e)) return 0 def keep_alive_function(func): try: containers = func.get("containers", 1) with closing(ThreadPool(containers)) as pool: mp = pool.map_async(ping_function, [func] * containers) _ = mp.get(300) pool.terminate() return containers except Exception as e: logging.error("Exception in keep_alive_function: {}".format(e)) return 0 def handler(event, context): try: functions = config.get("functions", []) logging.debug("Functions {}".format(functions)) if len(functions) == 0: logging.info("Skipping, no functions") return "OK" with closing(ThreadPool(len(functions))) as pool: mp = pool.map_async(keep_alive_function, functions) _ = mp.get(300) pool.terminate() return "OK" except Exception as e: log.error("Exception in handler: {}".format(e)) return "Internal Server Error" if __name__ == "__main__": handler(None, None)