import logging import boto3 import webbrowser logger = logging.getLogger(__name__) def main(): master_instance_type = 'm5.xlarge' logger.info("Must be on VPN to view webpages!") clusters = find_clusters(name='production-streaming-cluster') for cluster in clusters: app_ids = find_application_ids(cluster['Id']) ec2_instances = find_cluster_ec2_instances(cluster['Id']) primary_instance = [instance for instance in ec2_instances if instance['InstanceType'] == master_instance_type][0] dns_name = primary_instance['PrivateDnsName'] for app_id in app_ids: spark_url = f'http://{dns_name}:20888/proxy/{app_id}/streaming/' hadoop_url = f'http://{dns_name}:8088/cluster/app/{app_id}/' logger.info("Opening Spark UI %s", spark_url) logger.info("Opening Hadoop UI %s", hadoop_url) webbrowser.open(spark_url) webbrowser.open(hadoop_url) def find_cluster_ec2_instances(job_flow_id): client = boto3.client('ec2') custom_filter = [{ 'Name': 'tag:aws:elasticmapreduce:job-flow-id', 'Values': [job_flow_id]}] response = client.describe_instances(Filters=custom_filter) # NOTE: May need extra work for non-spot instances! e.g. "Reservations" key is specific to spot instances for group in response.get('Reservations', []): for instance in group.get('Instances', []): yield instance def find_application_ids(cluster_id): emr_client = boto3.client('emr') cluster_details = emr_client.describe_cluster(ClusterId=cluster_id) s3_path_parts = cluster_details['Cluster']['LogUri'].replace('s3://', '').replace('s3n://', '').split('/') bucket_name = s3_path_parts[0] sub_path = "/".join(s3_path_parts[1:-1]) containers_sub_path = f'{sub_path}/{cluster_id}/containers/' s3_client = boto3.client('s3') result = s3_client.list_objects(Bucket=bucket_name, Prefix=containers_sub_path, Delimiter='/') for prefix in result.get('CommonPrefixes', []): app_id = prefix['Prefix'].split('/')[-2] yield app_id def find_clusters(name=None): client = boto3.client('emr') clusters = client.list_clusters(ClusterStates=['RUNNING', 'WAITING']).get('Clusters', []) if not clusters: logger.info('No Clusters RUNNING') return None for cluster in clusters: if name: if cluster['Name'] == name: yield cluster else: yield cluster if __name__ == "__main__": main()