Skip to content

Instantly share code, notes, and snippets.

@ksingh7
Created October 1, 2021 21:48
Show Gist options
  • Select an option

  • Save ksingh7/e43dfb4b5e0db8658705a342aa30dae1 to your computer and use it in GitHub Desktop.

Select an option

Save ksingh7/e43dfb4b5e0db8658705a342aa30dae1 to your computer and use it in GitHub Desktop.

Revisions

  1. ksingh7 created this gist Oct 1, 2021.
    20 changes: 20 additions & 0 deletions kafka-python-producer-consumer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,20 @@
    from kafka import KafkaProducer, KafkaConsumer
    import json
    from bson import json_util

    bootstrap_server = 'my-cluster-kafka-route-bootstrap-nestjs-testing.apps.ocp.ceph-s3.com:443'

    print("Producing messages to Kafka topic ...")
    producer = KafkaProducer(bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL")

    for i in range(10):
    message = {'value': i}
    producer.send('my-topic', json.dumps(message, default=json_util.default).encode('utf-8'))

    print("Consuming messages from Kafka topic ...")

    consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL", consumer_timeout_ms=10000, enable_auto_commit=True)
    for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: value=%s" % (message.topic, message.partition,message.offset,message.value))