#!/usr/bin/python3 # Interweave two JSON log streams import json import os import sys import time import urllib.request from os import getenv from subprocess import Popen, PIPE, STDOUT, check_output, check_call, call from sys import stdout from threading import Lock, Thread AGENT = "/usr/bin/amazon-ssm-agent" def weave(stream, output, lock): while True: # noinspection PyBroadException try: ln = stream.readline() with lock: output.write(ln) except Exception: break def init_ssm(): # Get the optional task information meta_url = getenv("ECS_CONTAINER_METADATA_URI", "") creds_meta_url = getenv("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI", "") if not meta_url or not creds_meta_url: return False contents = urllib.request.urlopen(meta_url + "/task").read() meta = json.loads(contents.decode("utf-8")) # E.g.: arn:aws:ecs:us-east-1:3412341234241:task\/e36711e4-0272-4f08-83e4-b80d36b9e180 taskId = meta['TaskARN'].split("/")[-1] creds_contents = urllib.request.urlopen("http://169.254.170.2" + creds_meta_url).read() creds_meta = json.loads(creds_contents.decode("utf-8")) tags = [ "--tags", "Key=Name,Value=%s" % taskId, "Key=TaskId,Value=%s" % taskId, "Key=Role,Value=Fargate", "Key=Cluster,Value=%s" % meta['Cluster'], "Key=TaskFamily,Value=%s" % meta['Family'], "Key=TaskRevision,Value=%s" % meta['Revision'], ] # arn:aws:iam::123412341234124:role/service-role/ApolloAlphaTask transformed # to service-role/ApolloAlphaTask ssm_role = "service-role/" + creds_meta["RoleArn"].split("/")[-1] # E.g.: arn:aws:ecs:us-east-1:123412341234:task\/e36711e4-0272-4f08-83e4-b80d36b9e180 ssm_region = meta["TaskARN"].split(":")[3] # arn:aws:ecs:us-east-1:1234123411313:cluster\/ApolloAlpha cluster_name = meta["Cluster"].split("/")[-1] ssm_desc = cluster_name + " Task" act_out = check_output(["aws", "ssm", "create-activation", "--iam-role", ssm_role, "--region", ssm_region, "--description", ssm_desc, "--default-instance-name", "t-"+taskId] + tags) act_data = json.loads(act_out.decode("utf-8")) print('{"msg": "Received activation id %s"}' % (act_data["ActivationId"])) # Register the instance in SSM check_call([AGENT, "-register", "-y", "-code", act_data["ActivationCode"], "-id", act_data["ActivationId"], "-region", ssm_region, "-i", taskId]) print('{"msg": "Finished activation"}') # Now we should have the machine-id, so we can link the task with it! with open("/var/lib/amazon/ssm/registration") as fl: reg_data = json.load(fl) machine_id = reg_data["ManagedInstanceID"] call(["aws", "ecs", "tag-resource", "--resource-arn", meta['TaskARN'], "--tags", "key=MachineId,value=%s" % machine_id]) # Allow sudo access with open("/etc/sudoers.d/ssm", "w") as fl: fl.write('Defaults:ssm-user env_keep += *\n') fl.write("ssm-user ALL=(ALL) NOPASSWD:ALL\n") return True def main(): mtx = Lock() if not init_ssm(): # No SSM? Just launch the target directly os.execlp(sys.argv[1], *sys.argv[1:]) ssm_process = Popen([AGENT], stdin=None, stdout=PIPE, stderr=STDOUT) Thread(target=lambda: weave(ssm_process.stdout, stdout.buffer, mtx), name="SSMPump").start() delegated = Popen(sys.argv[1:], stdin=sys.stdin, stdout=PIPE, stderr=sys.stderr) Thread(target=lambda: weave(delegated.stdout, stdout.buffer, mtx), name="Delegated").start() start = time.time() delegated.wait() # If the target has exited too fast (in less than 5 mins) then wait for # 15 minutes before stopping the SSM to give us ability to log in and debug # the issue (and to reduce the churn velocity). if time.time() - start < 300: time.sleep(900) ssm_process.kill() ssm_process.wait() if __name__ == "__main__": main()