Skip to content

Instantly share code, notes, and snippets.

@jpbarto
Last active January 12, 2024 13:13
Show Gist options
  • Save jpbarto/c484c923c365b3e391b8eb5029cbaebc to your computer and use it in GitHub Desktop.
Save jpbarto/c484c923c365b3e391b8eb5029cbaebc to your computer and use it in GitHub Desktop.

Revisions

  1. jpbarto revised this gist Nov 29, 2018. 1 changed file with 81 additions and 4 deletions.
    85 changes: 81 additions & 4 deletions process_cognito_users.py
    Original file line number Diff line number Diff line change
    @@ -1,18 +1,53 @@
    #!/usr/bin/env python3
    """
    The following script demonstrates how to use the AWS Boto3 SDK to iterate through
    all of the users in an AWS Cognito User Pool and examine the events associated
    with each user.
    If any failed authentication events are found the script formats them as messages
    and logs them to CloudWatch logs.
    This script could easily be modified to run periodically as a Lambda function
    triggered by a scheduled CloudWatch rule. If transforming this script to a
    Lambda function I would recommend using the os.environ to retrieve parameters for
    the Lambda function such as AWS Region and Cognito User Pool identity.
    """

    import boto3

    # set the region to operate in
    region = boto3.session.Session().region_name
    # create clients for Cognito Identity Provider (User pools) and CloudWatch logs
    idp = boto3.client ('cognito-idp', region_name = region)
    logs = boto3.client ('logs', region_name = region)

    user_pool_id = 'eu-west-2_A0D4ZfpUE'
    # define the user pool this script will work with
    user_pool_id = '<your user pool here>'

    def get_users ():
    """
    Retreive a list of users from the Cognito user pool.
    Returns a list of dictionaries in the form of:
    [
    {'username': 'user001', 'email': '[email protected]'},
    ...
    {'username': 'scott', 'email': '[email protected]'}
    ]
    """
    usernames = list ()

    # NOTE if your user pool has a lot of users in it you will need to look for
    # a pagination token in the response. Subsequent requests to the user pool
    # should reference the pagination token to get the next set of users.
    #
    # For more detail see
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html#CognitoIdentityProvider.Client.list_users
    users_resp = idp.list_users (
    UserPoolId = user_pool_id,
    AttributesToGet = ['email'])

    # iterate over the returned users and extract username and email
    for user in users_resp['Users']:
    user_record = {'username': user['Username'], 'email': None}

    @@ -25,10 +60,32 @@ def get_users ():
    return usernames

    def get_auth_events (username):
    """
    For a given username retreive the most recent authentication events up to
    a maximum of RESULT_LIMIT events. This is an arbitrary value set by the author.
    Returns a list of dictionaries of the form
    [
    {'event_type': 'SignIn',
    'timestamp': '2018-09-24T23:58:04Z',
    'unix_timestamp': 1537833484000,
    'result': 'Fail',
    'ip_address': '192.158.68.23',
    'location_city': 'London',
    'location_country': 'United Kingdom'},
    ...
    ]
    """
    RESULT_LIMIT = 5

    auth_events = list ()

    # List user auth events may also return a 'next' token if more events
    # were requested than can be retrieved in a single call. You may want
    # to keep an eye out for a token in the response to know that there are
    # more events waiting for retrieval.
    # For more see
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html#CognitoIdentityProvider.Client.admin_list_user_auth_events
    events_resp = idp.admin_list_user_auth_events (
    UserPoolId = user_pool_id,
    Username = username,
    @@ -49,35 +106,50 @@ def get_auth_events (username):
    return auth_events

    def prepare_log_stream (group_name, stream_name):
    """
    Create a CloudWatch log group and log stream if they don't already exist.
    group_name (string) name of the log group to be created
    stream_name (string) name of the log stream to be created in the group
    Returns None
    """
    group_exists = False
    stream_exists = False

    # call describe to determine if the log group already exists
    resp = logs.describe_log_groups (logGroupNamePrefix = group_name)
    for group in resp['logGroups']:
    if group['logGroupName'] == group_name:
    group_exists = True
    # if the group wasn't found assume it doesn't exist and create it
    if not group_exists:
    logs.create_log_group (logGroupName = GROUP_NAME)

    # call describe to determine if the log stream already exists
    resp = logs.describe_log_streams (logGroupName = group_name, logStreamNamePrefix = stream_name)
    for stream in resp['logStreams']:
    if stream['logStreamName'] == stream_name:
    stream_exists = True

    # if the stream wasn't found, create it
    if not stream_exists:
    logs.create_log_stream (logGroupName = GROUP_NAME, logStreamName = STREAM_NAME)


    # iterate over all the users in the user pool, retreive authentication events for every user
    # and log failed login attempts to CloudWatch logs
    for user in get_users ():
    # arbitration group and stream name for CloudWatch log messages
    GROUP_NAME = '/myorg/myapp'
    STREAM_NAME = 'failed_logins'

    log_entries = list ()

    # ensure that the log group and stream exist before proceeding
    prepare_log_stream (GROUP_NAME, STREAM_NAME)

    # iterate over all the events for a user and look for failed logins
    for event in get_auth_events (user['username']):

    if event['result'] == 'Fail':
    log_message = "WARN: User {} ({}) failed to login at {} from {} in {}, {}".format (
    user['username'],
    @@ -86,16 +158,21 @@ def prepare_log_stream (group_name, stream_name):
    event['ip_address'],
    event['location_city'],
    event['location_country'])
    # CloudWatch log entries need to have a timestamp (in millis since 1970) and a message
    # we defined the message above and are using the timestamp retreived from Cognito
    log_entries.append ({
    'timestamp': event['unix_timestamp'],
    'message': log_message
    })

    # CloudWatch logs requires the log entries to be ordered by timestamp
    def on_time (d):
    return d['timestamp']

    log_entries.sort (key=on_time, reverse=False)

    # use the put log events CloudWatch API to record the messages to CW logs
    # for more info see
    # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.put_log_events
    logs.put_log_events (
    logGroupName = GROUP_NAME,
    logStreamName = STREAM_NAME,
  2. jpbarto created this gist Nov 29, 2018.
    103 changes: 103 additions & 0 deletions process_cognito_users.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,103 @@
    import boto3

    region = boto3.session.Session().region_name
    idp = boto3.client ('cognito-idp', region_name = region)
    logs = boto3.client ('logs', region_name = region)

    user_pool_id = 'eu-west-2_A0D4ZfpUE'

    def get_users ():
    usernames = list ()

    users_resp = idp.list_users (
    UserPoolId = user_pool_id,
    AttributesToGet = ['email'])

    for user in users_resp['Users']:
    user_record = {'username': user['Username'], 'email': None}

    for attr in user['Attributes']:
    if attr['Name'] == 'email':
    user_record['email'] = attr['Value']

    usernames.append (user_record)

    return usernames

    def get_auth_events (username):
    RESULT_LIMIT = 5

    auth_events = list ()

    events_resp = idp.admin_list_user_auth_events (
    UserPoolId = user_pool_id,
    Username = username,
    MaxResults = RESULT_LIMIT)

    for event in events_resp['AuthEvents']:
    event_record = {
    'event_type': event['EventType'],
    'timestamp': str(event['CreationDate']),
    'unix_timestamp': int(event['CreationDate'].timestamp ()*1000),
    'result': event['EventResponse'],
    'ip_address': event['EventContextData']['IpAddress'],
    'location_city': event['EventContextData']['City'],
    'location_country': event['EventContextData']['Country']
    }
    auth_events.append (event_record)

    return auth_events

    def prepare_log_stream (group_name, stream_name):
    group_exists = False
    stream_exists = False

    resp = logs.describe_log_groups (logGroupNamePrefix = group_name)
    for group in resp['logGroups']:
    if group['logGroupName'] == group_name:
    group_exists = True
    if not group_exists:
    logs.create_log_group (logGroupName = GROUP_NAME)

    resp = logs.describe_log_streams (logGroupName = group_name, logStreamNamePrefix = stream_name)
    for stream in resp['logStreams']:
    if stream['logStreamName'] == stream_name:
    stream_exists = True

    if not stream_exists:
    logs.create_log_stream (logGroupName = GROUP_NAME, logStreamName = STREAM_NAME)


    for user in get_users ():
    GROUP_NAME = '/myorg/myapp'
    STREAM_NAME = 'failed_logins'

    log_entries = list ()

    prepare_log_stream (GROUP_NAME, STREAM_NAME)

    for event in get_auth_events (user['username']):

    if event['result'] == 'Fail':
    log_message = "WARN: User {} ({}) failed to login at {} from {} in {}, {}".format (
    user['username'],
    user['email'],
    event['timestamp'],
    event['ip_address'],
    event['location_city'],
    event['location_country'])
    log_entries.append ({
    'timestamp': event['unix_timestamp'],
    'message': log_message
    })

    def on_time (d):
    return d['timestamp']

    log_entries.sort (key=on_time, reverse=False)

    logs.put_log_events (
    logGroupName = GROUP_NAME,
    logStreamName = STREAM_NAME,
    logEvents = log_entries
    )