Skip to content

Instantly share code, notes, and snippets.

@ycyr
Created February 16, 2023 11:07
Show Gist options
  • Save ycyr/d0adee7cb09b2c1caa2c022af8c9dee4 to your computer and use it in GitHub Desktop.
Save ycyr/d0adee7cb09b2c1caa2c022af8c9dee4 to your computer and use it in GitHub Desktop.

Revisions

  1. ycyr created this gist Feb 16, 2023.
    23 changes: 23 additions & 0 deletions Lamba_python_cloudwatch_msk.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,23 @@
    import base64
    import json
    import boto3
    from kafka import KafkaProducer

    msk_topic = 'my-msk-topic'
    msk_broker_list = ['my-msk-broker-1:9092', 'my-msk-broker-2:9092'] # Replace with your own broker list

    # Create a Kafka producer with the specified broker list
    producer = KafkaProducer(bootstrap_servers=msk_broker_list)

    def lambda_handler(event, context):
    # Get the raw log data from the CloudWatch subscription
    log_data = base64.b64decode(event['awslogs']['data'])
    logs = json.loads(log_data)

    # Extract the log events and publish them to the Kafka topic
    for log_event in logs['logEvents']:
    message = log_event['message']
    producer.send(msk_topic, message.encode('utf-8'))

    # Flush any remaining messages in the producer queue
    producer.flush()