Last active
January 12, 2024 13:13
-
-
Save jpbarto/c484c923c365b3e391b8eb5029cbaebc to your computer and use it in GitHub Desktop.
Revisions
-
jpbarto revised this gist
Nov 29, 2018 . 1 changed file with 81 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,18 +1,53 @@ #!/usr/bin/env python3 """ The following script demonstrates how to use the AWS Boto3 SDK to iterate through all of the users in an AWS Cognito User Pool and examine the events associated with each user. If any failed authentication events are found the script formats them as messages and logs them to CloudWatch logs. This script could easily be modified to run periodically as a Lambda function triggered by a scheduled CloudWatch rule. If transforming this script to a Lambda function I would recommend using the os.environ to retrieve parameters for the Lambda function such as AWS Region and Cognito User Pool identity. """ import boto3 # set the region to operate in region = boto3.session.Session().region_name # create clients for Cognito Identity Provider (User pools) and CloudWatch logs idp = boto3.client ('cognito-idp', region_name = region) logs = boto3.client ('logs', region_name = region) # define the user pool this script will work with user_pool_id = '<your user pool here>' def get_users (): """ Retreive a list of users from the Cognito user pool. Returns a list of dictionaries in the form of: [ {'username': 'user001', 'email': '[email protected]'}, ... {'username': 'scott', 'email': '[email protected]'} ] """ usernames = list () # NOTE if your user pool has a lot of users in it you will need to look for # a pagination token in the response. Subsequent requests to the user pool # should reference the pagination token to get the next set of users. # # For more detail see # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html#CognitoIdentityProvider.Client.list_users users_resp = idp.list_users ( UserPoolId = user_pool_id, AttributesToGet = ['email']) # iterate over the returned users and extract username and email for user in users_resp['Users']: user_record = {'username': user['Username'], 'email': None} @@ -25,10 +60,32 @@ def get_users (): return usernames def get_auth_events (username): """ For a given username retreive the most recent authentication events up to a maximum of RESULT_LIMIT events. This is an arbitrary value set by the author. Returns a list of dictionaries of the form [ {'event_type': 'SignIn', 'timestamp': '2018-09-24T23:58:04Z', 'unix_timestamp': 1537833484000, 'result': 'Fail', 'ip_address': '192.158.68.23', 'location_city': 'London', 'location_country': 'United Kingdom'}, ... ] """ RESULT_LIMIT = 5 auth_events = list () # List user auth events may also return a 'next' token if more events # were requested than can be retrieved in a single call. You may want # to keep an eye out for a token in the response to know that there are # more events waiting for retrieval. # For more see # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cognito-idp.html#CognitoIdentityProvider.Client.admin_list_user_auth_events events_resp = idp.admin_list_user_auth_events ( UserPoolId = user_pool_id, Username = username, @@ -49,35 +106,50 @@ def get_auth_events (username): return auth_events def prepare_log_stream (group_name, stream_name): """ Create a CloudWatch log group and log stream if they don't already exist. group_name (string) name of the log group to be created stream_name (string) name of the log stream to be created in the group Returns None """ group_exists = False stream_exists = False # call describe to determine if the log group already exists resp = logs.describe_log_groups (logGroupNamePrefix = group_name) for group in resp['logGroups']: if group['logGroupName'] == group_name: group_exists = True # if the group wasn't found assume it doesn't exist and create it if not group_exists: logs.create_log_group (logGroupName = GROUP_NAME) # call describe to determine if the log stream already exists resp = logs.describe_log_streams (logGroupName = group_name, logStreamNamePrefix = stream_name) for stream in resp['logStreams']: if stream['logStreamName'] == stream_name: stream_exists = True # if the stream wasn't found, create it if not stream_exists: logs.create_log_stream (logGroupName = GROUP_NAME, logStreamName = STREAM_NAME) # iterate over all the users in the user pool, retreive authentication events for every user # and log failed login attempts to CloudWatch logs for user in get_users (): # arbitration group and stream name for CloudWatch log messages GROUP_NAME = '/myorg/myapp' STREAM_NAME = 'failed_logins' log_entries = list () # ensure that the log group and stream exist before proceeding prepare_log_stream (GROUP_NAME, STREAM_NAME) # iterate over all the events for a user and look for failed logins for event in get_auth_events (user['username']): if event['result'] == 'Fail': log_message = "WARN: User {} ({}) failed to login at {} from {} in {}, {}".format ( user['username'], @@ -86,16 +158,21 @@ def prepare_log_stream (group_name, stream_name): event['ip_address'], event['location_city'], event['location_country']) # CloudWatch log entries need to have a timestamp (in millis since 1970) and a message # we defined the message above and are using the timestamp retreived from Cognito log_entries.append ({ 'timestamp': event['unix_timestamp'], 'message': log_message }) # CloudWatch logs requires the log entries to be ordered by timestamp def on_time (d): return d['timestamp'] log_entries.sort (key=on_time, reverse=False) # use the put log events CloudWatch API to record the messages to CW logs # for more info see # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.put_log_events logs.put_log_events ( logGroupName = GROUP_NAME, logStreamName = STREAM_NAME, -
jpbarto created this gist
Nov 29, 2018 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,103 @@ import boto3 region = boto3.session.Session().region_name idp = boto3.client ('cognito-idp', region_name = region) logs = boto3.client ('logs', region_name = region) user_pool_id = 'eu-west-2_A0D4ZfpUE' def get_users (): usernames = list () users_resp = idp.list_users ( UserPoolId = user_pool_id, AttributesToGet = ['email']) for user in users_resp['Users']: user_record = {'username': user['Username'], 'email': None} for attr in user['Attributes']: if attr['Name'] == 'email': user_record['email'] = attr['Value'] usernames.append (user_record) return usernames def get_auth_events (username): RESULT_LIMIT = 5 auth_events = list () events_resp = idp.admin_list_user_auth_events ( UserPoolId = user_pool_id, Username = username, MaxResults = RESULT_LIMIT) for event in events_resp['AuthEvents']: event_record = { 'event_type': event['EventType'], 'timestamp': str(event['CreationDate']), 'unix_timestamp': int(event['CreationDate'].timestamp ()*1000), 'result': event['EventResponse'], 'ip_address': event['EventContextData']['IpAddress'], 'location_city': event['EventContextData']['City'], 'location_country': event['EventContextData']['Country'] } auth_events.append (event_record) return auth_events def prepare_log_stream (group_name, stream_name): group_exists = False stream_exists = False resp = logs.describe_log_groups (logGroupNamePrefix = group_name) for group in resp['logGroups']: if group['logGroupName'] == group_name: group_exists = True if not group_exists: logs.create_log_group (logGroupName = GROUP_NAME) resp = logs.describe_log_streams (logGroupName = group_name, logStreamNamePrefix = stream_name) for stream in resp['logStreams']: if stream['logStreamName'] == stream_name: stream_exists = True if not stream_exists: logs.create_log_stream (logGroupName = GROUP_NAME, logStreamName = STREAM_NAME) for user in get_users (): GROUP_NAME = '/myorg/myapp' STREAM_NAME = 'failed_logins' log_entries = list () prepare_log_stream (GROUP_NAME, STREAM_NAME) for event in get_auth_events (user['username']): if event['result'] == 'Fail': log_message = "WARN: User {} ({}) failed to login at {} from {} in {}, {}".format ( user['username'], user['email'], event['timestamp'], event['ip_address'], event['location_city'], event['location_country']) log_entries.append ({ 'timestamp': event['unix_timestamp'], 'message': log_message }) def on_time (d): return d['timestamp'] log_entries.sort (key=on_time, reverse=False) logs.put_log_events ( logGroupName = GROUP_NAME, logStreamName = STREAM_NAME, logEvents = log_entries )