Skip to content

Instantly share code, notes, and snippets.

@sysboss
Created May 21, 2018 15:41
Show Gist options
  • Save sysboss/d40ea8a7a12f510e61d7980269323b36 to your computer and use it in GitHub Desktop.
Save sysboss/d40ea8a7a12f510e61d7980269323b36 to your computer and use it in GitHub Desktop.

Revisions

  1. sysboss created this gist May 21, 2018.
    95 changes: 95 additions & 0 deletions query_athena.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,95 @@
    #!/usr/bin/env python3
    #
    # Query AWS Athena using SQL
    # Copyright (c) Alexey Baikov <sysboss[at]mail.ru>
    #
    # This snippet is a basic example to query Athen and load the results
    # to a variable.
    #
    # Requirements:
    # > pip3 install boto3 botocore retrying

    import os
    import sys
    import csv
    import boto3
    import botocore
    from retrying import retry

    # configuration
    s3_bucket = 'athenaoutput' # S3 Bucket name
    s3_ouput = 's3://'+ s3_bucket # S3 Bucket to store results
    database = 'datalake_database' # The database to which the query belongs

    # init clients
    athena = boto3.client('athena')
    s3 = boto3.resource('s3')

    @retry(stop_max_attempt_number = 10,
    wait_exponential_multiplier = 300,
    wait_exponential_max = 1 * 60 * 1000)
    def poll_status(_id):
    result = athena.get_query_execution( QueryExecutionId = _id )
    state = result['QueryExecution']['Status']['State']

    if state == 'SUCCEEDED':
    return result
    elif state == 'FAILED':
    return result
    else:
    raise Exception

    def run_query(query, database, s3_output):
    response = athena.start_query_execution(
    QueryString=query,
    QueryExecutionContext={
    'Database': database
    },
    ResultConfiguration={
    'OutputLocation': s3_output,
    })

    QueryExecutionId = response['QueryExecutionId']
    result = poll_status(QueryExecutionId)

    if result['QueryExecution']['Status']['State'] == 'SUCCEEDED':
    print("Query SUCCEEDED: {}".format(QueryExecutionId))

    s3_key = QueryExecutionId + '.csv'
    local_filename = QueryExecutionId + '.csv'

    # download result file
    try:
    s3.Bucket(s3_bucket).download_file(s3_key, local_filename)
    except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == "404":
    print("The object does not exist.")
    else:
    raise

    # read file to array
    rows = []
    with open(local_filename) as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
    rows.append(row)

    # delete result file
    if os.path.isfile(local_filename):
    os.remove(local_filename)

    return rows

    if __name__ == '__main__':
    # SQL Query to execute
    query = ("""
    SELECT id, name
    FROM example
    LIMIT 20
    """)

    print("Executing query: {}".format(query))
    result = run_query(query, database, s3_ouput)

    print("Results:")
    print(result)