#!/usr/bin/env python3 import concurrent.futures import copy import os from typing import ( Any, Callable, ) import boto3 def assumed_session( *, management_session: boto3.session.Session, assumed_account_id: str, role_name='OrganizationAccountAccessRole', ) -> boto3.session.Session: res = management_session.client('sts').assume_role( RoleArn=f"arn:aws:iam::{assumed_account_id}:role/{role_name}", RoleSessionName=f"{assumed_account_id}@{role_name}"[:64], ) return boto3.session.Session( aws_access_key_id=res['Credentials']['AccessKeyId'], aws_secret_access_key=res['Credentials']['SecretAccessKey'], aws_session_token=res['Credentials']['SessionToken'], ) def do_each( management_session: boto3.session.Session, account_ids: list[str], func: Callable, ) -> list[Any]: results = [] def runner(arg) -> Any: management_session = arg['management_session'] account_id = arg['account_id'] session = assumed_session( management_session=management_session, assumed_account_id=account_id, ) return func(session=session, account_id=account_id) # boto3 の Session はスレッドセーフじゃないので deepcopy する! args = [ { 'management_session': copy.deepcopy(management_session), 'account_id': account_id, } for account_id in account_ids ] with concurrent.futures.ThreadPoolExecutor() as executor: try: for r in executor.map(runner, args, timeout=3600): results.append(r) except concurrent.futures.TimeoutError: raise RuntimeError(f"timeout {len(results)}/{len(args)}") return results def list_iam_users( *, session: boto3.session.Session, account_id: str, ) -> dict: iam = session.client('iam') while True: res = iam.generate_credential_report() if res['State'] == 'COMPLETE': break res = iam.get_credential_report() return {account_id: res['Content'].decode()} management_session = boto3.session.Session(profile_name=os.getenv('AWS_PROFILE')) account_ids = ['999999999999', '888888888888', '...'] # 数字のアカウント ID results = do_each( management_session, account_ids, list_iam_users, ) print(results)