Skip to content

Instantly share code, notes, and snippets.

@matthewpick
Last active June 17, 2021 20:01
Show Gist options
  • Save matthewpick/39a4a308b590d29e46c523770a972783 to your computer and use it in GitHub Desktop.
Save matthewpick/39a4a308b590d29e46c523770a972783 to your computer and use it in GitHub Desktop.

Revisions

  1. matthewpick revised this gist Jun 17, 2021. 1 changed file with 20 additions and 19 deletions.
    39 changes: 20 additions & 19 deletions emr_open_spark_web_ui.py
    Original file line number Diff line number Diff line change
    @@ -11,20 +11,22 @@ def main():
    master_instance_type = 'm5.xlarge'
    logger.info("Must be on VPN to view webpages!")

    cluster = find_oldest_cluster()
    app_ids = find_application_ids(cluster['Id'])
    ec2_instances = find_cluster_ec2_instances(cluster['Id'])
    clusters = find_clusters(name='production-streaming-cluster')

    primary_instance = [instance for instance in ec2_instances if instance['InstanceType'] == master_instance_type][0]
    dns_name = primary_instance['PrivateDnsName']
    for cluster in clusters:
    app_ids = find_application_ids(cluster['Id'])
    ec2_instances = find_cluster_ec2_instances(cluster['Id'])

    primary_instance = [instance for instance in ec2_instances if instance['InstanceType'] == master_instance_type][0]
    dns_name = primary_instance['PrivateDnsName']

    for app_id in app_ids:
    spark_url = f'http://{dns_name}:20888/proxy/{app_id}/streaming/'
    hadoop_url = f'http://{dns_name}:8088/cluster/app/{app_id}/'
    logger.info("Opening Spark UI %s", spark_url)
    logger.info("Opening Hadoop UI %s", hadoop_url)
    webbrowser.open(spark_url)
    webbrowser.open(hadoop_url)
    for app_id in app_ids:
    spark_url = f'http://{dns_name}:20888/proxy/{app_id}/streaming/'
    hadoop_url = f'http://{dns_name}:8088/cluster/app/{app_id}/'
    logger.info("Opening Spark UI %s", spark_url)
    logger.info("Opening Hadoop UI %s", hadoop_url)
    webbrowser.open(spark_url)
    webbrowser.open(hadoop_url)


    def find_cluster_ec2_instances(job_flow_id):
    @@ -59,7 +61,7 @@ def find_application_ids(cluster_id):
    yield app_id


    def find_oldest_cluster(name=None):
    def find_clusters(name=None):
    client = boto3.client('emr')

    clusters = client.list_clusters(ClusterStates=['RUNNING', 'WAITING']).get('Clusters', [])
    @@ -68,13 +70,12 @@ def find_oldest_cluster(name=None):
    logger.info('No Clusters RUNNING')
    return None

    oldest_cluster = clusters[0]

    for cluster in clusters:
    if cluster['NormalizedInstanceHours'] > oldest_cluster['NormalizedInstanceHours']:
    oldest_cluster = cluster

    return oldest_cluster
    if name:
    if cluster['Name'] == name:
    yield cluster
    else:
    yield cluster


    if __name__ == "__main__":
  2. matthewpick created this gist Jun 17, 2021.
    81 changes: 81 additions & 0 deletions emr_open_spark_web_ui.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,81 @@
    import logging

    import boto3
    import webbrowser


    logger = logging.getLogger(__name__)


    def main():
    master_instance_type = 'm5.xlarge'
    logger.info("Must be on VPN to view webpages!")

    cluster = find_oldest_cluster()
    app_ids = find_application_ids(cluster['Id'])
    ec2_instances = find_cluster_ec2_instances(cluster['Id'])

    primary_instance = [instance for instance in ec2_instances if instance['InstanceType'] == master_instance_type][0]
    dns_name = primary_instance['PrivateDnsName']

    for app_id in app_ids:
    spark_url = f'http://{dns_name}:20888/proxy/{app_id}/streaming/'
    hadoop_url = f'http://{dns_name}:8088/cluster/app/{app_id}/'
    logger.info("Opening Spark UI %s", spark_url)
    logger.info("Opening Hadoop UI %s", hadoop_url)
    webbrowser.open(spark_url)
    webbrowser.open(hadoop_url)


    def find_cluster_ec2_instances(job_flow_id):
    client = boto3.client('ec2')

    custom_filter = [{
    'Name': 'tag:aws:elasticmapreduce:job-flow-id',
    'Values': [job_flow_id]}]

    response = client.describe_instances(Filters=custom_filter)

    # NOTE: May need extra work for non-spot instances! e.g. "Reservations" key is specific to spot instances
    for group in response.get('Reservations', []):
    for instance in group.get('Instances', []):
    yield instance


    def find_application_ids(cluster_id):
    emr_client = boto3.client('emr')
    cluster_details = emr_client.describe_cluster(ClusterId=cluster_id)
    s3_path_parts = cluster_details['Cluster']['LogUri'].replace('s3://', '').replace('s3n://', '').split('/')

    bucket_name = s3_path_parts[0]
    sub_path = "/".join(s3_path_parts[1:-1])
    containers_sub_path = f'{sub_path}/{cluster_id}/containers/'

    s3_client = boto3.client('s3')

    result = s3_client.list_objects(Bucket=bucket_name, Prefix=containers_sub_path, Delimiter='/')
    for prefix in result.get('CommonPrefixes', []):
    app_id = prefix['Prefix'].split('/')[-2]
    yield app_id


    def find_oldest_cluster(name=None):
    client = boto3.client('emr')

    clusters = client.list_clusters(ClusterStates=['RUNNING', 'WAITING']).get('Clusters', [])

    if not clusters:
    logger.info('No Clusters RUNNING')
    return None

    oldest_cluster = clusters[0]

    for cluster in clusters:
    if cluster['NormalizedInstanceHours'] > oldest_cluster['NormalizedInstanceHours']:
    oldest_cluster = cluster

    return oldest_cluster


    if __name__ == "__main__":
    main()