Last active
          June 17, 2021 20:01 
        
      - 
      
 - 
        
Save matthewpick/39a4a308b590d29e46c523770a972783 to your computer and use it in GitHub Desktop.  
Revisions
- 
        
matthewpick revised this gist
Jun 17, 2021 . 1 changed file with 20 additions and 19 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -11,20 +11,22 @@ def main(): master_instance_type = 'm5.xlarge' logger.info("Must be on VPN to view webpages!") clusters = find_clusters(name='production-streaming-cluster') for cluster in clusters: app_ids = find_application_ids(cluster['Id']) ec2_instances = find_cluster_ec2_instances(cluster['Id']) primary_instance = [instance for instance in ec2_instances if instance['InstanceType'] == master_instance_type][0] dns_name = primary_instance['PrivateDnsName'] for app_id in app_ids: spark_url = f'http://{dns_name}:20888/proxy/{app_id}/streaming/' hadoop_url = f'http://{dns_name}:8088/cluster/app/{app_id}/' logger.info("Opening Spark UI %s", spark_url) logger.info("Opening Hadoop UI %s", hadoop_url) webbrowser.open(spark_url) webbrowser.open(hadoop_url) def find_cluster_ec2_instances(job_flow_id): @@ -59,7 +61,7 @@ def find_application_ids(cluster_id): yield app_id def find_clusters(name=None): client = boto3.client('emr') clusters = client.list_clusters(ClusterStates=['RUNNING', 'WAITING']).get('Clusters', []) @@ -68,13 +70,12 @@ def find_oldest_cluster(name=None): logger.info('No Clusters RUNNING') return None for cluster in clusters: if name: if cluster['Name'] == name: yield cluster else: yield cluster if __name__ == "__main__":  - 
        
matthewpick created this gist
Jun 17, 2021 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,81 @@ import logging import boto3 import webbrowser logger = logging.getLogger(__name__) def main(): master_instance_type = 'm5.xlarge' logger.info("Must be on VPN to view webpages!") cluster = find_oldest_cluster() app_ids = find_application_ids(cluster['Id']) ec2_instances = find_cluster_ec2_instances(cluster['Id']) primary_instance = [instance for instance in ec2_instances if instance['InstanceType'] == master_instance_type][0] dns_name = primary_instance['PrivateDnsName'] for app_id in app_ids: spark_url = f'http://{dns_name}:20888/proxy/{app_id}/streaming/' hadoop_url = f'http://{dns_name}:8088/cluster/app/{app_id}/' logger.info("Opening Spark UI %s", spark_url) logger.info("Opening Hadoop UI %s", hadoop_url) webbrowser.open(spark_url) webbrowser.open(hadoop_url) def find_cluster_ec2_instances(job_flow_id): client = boto3.client('ec2') custom_filter = [{ 'Name': 'tag:aws:elasticmapreduce:job-flow-id', 'Values': [job_flow_id]}] response = client.describe_instances(Filters=custom_filter) # NOTE: May need extra work for non-spot instances! e.g. "Reservations" key is specific to spot instances for group in response.get('Reservations', []): for instance in group.get('Instances', []): yield instance def find_application_ids(cluster_id): emr_client = boto3.client('emr') cluster_details = emr_client.describe_cluster(ClusterId=cluster_id) s3_path_parts = cluster_details['Cluster']['LogUri'].replace('s3://', '').replace('s3n://', '').split('/') bucket_name = s3_path_parts[0] sub_path = "/".join(s3_path_parts[1:-1]) containers_sub_path = f'{sub_path}/{cluster_id}/containers/' s3_client = boto3.client('s3') result = s3_client.list_objects(Bucket=bucket_name, Prefix=containers_sub_path, Delimiter='/') for prefix in result.get('CommonPrefixes', []): app_id = prefix['Prefix'].split('/')[-2] yield app_id def find_oldest_cluster(name=None): client = boto3.client('emr') clusters = client.list_clusters(ClusterStates=['RUNNING', 'WAITING']).get('Clusters', []) if not clusters: logger.info('No Clusters RUNNING') return None oldest_cluster = clusters[0] for cluster in clusters: if cluster['NormalizedInstanceHours'] > oldest_cluster['NormalizedInstanceHours']: oldest_cluster = cluster return oldest_cluster if __name__ == "__main__": main()