#!/usr/bin/env python3 """ The following script demonstrates how to use the AWS Boto3 SDK to iterate through all of the users in an AWS Cognito User Pool and examine the events associated with each user. If any failed authentication events are found the script formats them as messages and logs them to CloudWatch logs. This script could easily be modified to run periodically as a Lambda function triggered by a scheduled CloudWatch rule. If transforming this script to a Lambda function I would recommend using the os.environ to retrieve parameters for the Lambda function such as AWS Region and Cognito User Pool identity. """ import boto3 # set the region to operate in region = boto3.session.Session().region_name # create clients for Cognito Identity Provider (User pools) and CloudWatch logs idp = boto3.client ('cognito-idp', region_name = region) logs = boto3.client ('logs', region_name = region) # define the user pool this script will work with user_pool_id = '' def get_users (): """ Retreive a list of users from the Cognito user pool. Returns a list of dictionaries in the form of: [ {'username': 'user001', 'email': 'user001@example.com'}, ... {'username': 'scott', 'email': 'tiger@example.com'} ] """ usernames = list () # NOTE if your user pool has a lot of users in it you will need to look for # a pagination token in the response. Subsequent requests to the user pool # should reference the pagination token to get the next set of users. # # For more detail see # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html#CognitoIdentityProvider.Client.list_users users_resp = idp.list_users ( UserPoolId = user_pool_id, AttributesToGet = ['email']) # iterate over the returned users and extract username and email for user in users_resp['Users']: user_record = {'username': user['Username'], 'email': None} for attr in user['Attributes']: if attr['Name'] == 'email': user_record['email'] = attr['Value'] usernames.append (user_record) return usernames def get_auth_events (username): """ For a given username retreive the most recent authentication events up to a maximum of RESULT_LIMIT events. This is an arbitrary value set by the author. Returns a list of dictionaries of the form [ {'event_type': 'SignIn', 'timestamp': '2018-09-24T23:58:04Z', 'unix_timestamp': 1537833484000, 'result': 'Fail', 'ip_address': '192.158.68.23', 'location_city': 'London', 'location_country': 'United Kingdom'}, ... ] """ RESULT_LIMIT = 5 auth_events = list () # List user auth events may also return a 'next' token if more events # were requested than can be retrieved in a single call. You may want # to keep an eye out for a token in the response to know that there are # more events waiting for retrieval. # For more see # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html#CognitoIdentityProvider.Client.admin_list_user_auth_events events_resp = idp.admin_list_user_auth_events ( UserPoolId = user_pool_id, Username = username, MaxResults = RESULT_LIMIT) for event in events_resp['AuthEvents']: event_record = { 'event_type': event['EventType'], 'timestamp': str(event['CreationDate']), 'unix_timestamp': int(event['CreationDate'].timestamp ()*1000), 'result': event['EventResponse'], 'ip_address': event['EventContextData']['IpAddress'], 'location_city': event['EventContextData']['City'], 'location_country': event['EventContextData']['Country'] } auth_events.append (event_record) return auth_events def prepare_log_stream (group_name, stream_name): """ Create a CloudWatch log group and log stream if they don't already exist. group_name (string) name of the log group to be created stream_name (string) name of the log stream to be created in the group Returns None """ group_exists = False stream_exists = False # call describe to determine if the log group already exists resp = logs.describe_log_groups (logGroupNamePrefix = group_name) for group in resp['logGroups']: if group['logGroupName'] == group_name: group_exists = True # if the group wasn't found assume it doesn't exist and create it if not group_exists: logs.create_log_group (logGroupName = GROUP_NAME) # call describe to determine if the log stream already exists resp = logs.describe_log_streams (logGroupName = group_name, logStreamNamePrefix = stream_name) for stream in resp['logStreams']: if stream['logStreamName'] == stream_name: stream_exists = True # if the stream wasn't found, create it if not stream_exists: logs.create_log_stream (logGroupName = GROUP_NAME, logStreamName = STREAM_NAME) # iterate over all the users in the user pool, retreive authentication events for every user # and log failed login attempts to CloudWatch logs for user in get_users (): # arbitration group and stream name for CloudWatch log messages GROUP_NAME = '/myorg/myapp' STREAM_NAME = 'failed_logins' log_entries = list () # ensure that the log group and stream exist before proceeding prepare_log_stream (GROUP_NAME, STREAM_NAME) # iterate over all the events for a user and look for failed logins for event in get_auth_events (user['username']): if event['result'] == 'Fail': log_message = "WARN: User {} ({}) failed to login at {} from {} in {}, {}".format ( user['username'], user['email'], event['timestamp'], event['ip_address'], event['location_city'], event['location_country']) # CloudWatch log entries need to have a timestamp (in millis since 1970) and a message # we defined the message above and are using the timestamp retreived from Cognito log_entries.append ({ 'timestamp': event['unix_timestamp'], 'message': log_message }) # CloudWatch logs requires the log entries to be ordered by timestamp def on_time (d): return d['timestamp'] log_entries.sort (key=on_time, reverse=False) # use the put log events CloudWatch API to record the messages to CW logs # for more info see # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.put_log_events logs.put_log_events ( logGroupName = GROUP_NAME, logStreamName = STREAM_NAME, logEvents = log_entries )