Skip to content

Instantly share code, notes, and snippets.

@pvsune
Last active April 22, 2025 14:17
Show Gist options
  • Save pvsune/62c0556eff54087927334e542589ddcd to your computer and use it in GitHub Desktop.
Save pvsune/62c0556eff54087927334e542589ddcd to your computer and use it in GitHub Desktop.

Revisions

  1. pvsune revised this gist Apr 26, 2020. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions concurrent.kafka.consumer.py
    Original file line number Diff line number Diff line change
    @@ -2,6 +2,7 @@
    import logging
    import os
    import threading
    import time
    from multiprocessing import Process
    from queue import Queue

    @@ -14,6 +15,7 @@ def _process_msg(q, c):
    '#%sT%s - Received message: %s',
    os.getpid(), threading.get_ident(), msg.value().decode('utf-8')
    )
    time.sleep(5)
    q.task_done()
    c.commit(msg)

  2. pvsune revised this gist Apr 26, 2020. 1 changed file with 3 additions and 1 deletion.
    4 changes: 3 additions & 1 deletion concurrent.kafka.consumer.py
    Original file line number Diff line number Diff line change
    @@ -3,7 +3,7 @@
    import os
    import threading
    from multiprocessing import Process
    from queue import Queue `
    from queue import Queue

    from confluent_kafka import Consumer

    @@ -71,6 +71,8 @@ def main(config):
    format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',
    )
    main(config={
    # At most, this should be the total number of Kafka partitions on
    # the topic.
    'num_workers': 4,
    'num_threads': 4,
    'topic': 'my_topic_name',
  3. pvsune revised this gist Apr 26, 2020. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions concurrent.kafka.consumer.py
    Original file line number Diff line number Diff line change
    @@ -76,9 +76,9 @@ def main(config):
    'topic': 'my_topic_name',
    'kafka_kwargs': {
    'bootstrap.servers': ','.join([
    'cluster1.mykafka.com'
    'cluster2.mykafka.com'
    'cluster3.mykafka.com'
    'cluster1.mykafka.com',
    'cluster2.mykafka.com',
    'cluster3.mykafka.com',
    ]),
    'group.id': 'my_consumer_group',
    'auto.offset.reset': 'earliest',
  4. pvsune created this gist Apr 26, 2020.
    88 changes: 88 additions & 0 deletions concurrent.kafka.consumer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,88 @@
    #!/usr/bin/env python
    import logging
    import os
    import threading
    from multiprocessing import Process
    from queue import Queue `

    from confluent_kafka import Consumer


    def _process_msg(q, c):
    msg = q.get(timeout=60) # Set timeout to care for POSIX<3.0 and Windows.
    logging.info(
    '#%sT%s - Received message: %s',
    os.getpid(), threading.get_ident(), msg.value().decode('utf-8')
    )
    q.task_done()
    c.commit(msg)


    def _consume(config):
    logging.info(
    '#%s - Starting consumer group=%s, topic=%s',
    os.getpid(), config['kafka_kwargs']['group.id'], config['topic'],
    )
    c = Consumer(**config['kafka_kwargs'])
    c.subscribe([config['topic']])
    q = Queue(maxsize=config['num_threads'])

    while True:
    logging.info('#%s - Waiting for message...', os.getpid())
    try:
    msg = c.poll(60)
    if msg is None:
    continue
    if msg.error():
    logging.error(
    '#%s - Consumer error: %s', os.getpid(), msg.error()
    )
    continue
    q.put(msg)
    # Use default daemon=False to stop threads gracefully in order to
    # release resources properly.
    t = threading.Thread(target=_process_msg, args=(q, c))
    t.start()
    except Exception:
    logging.exception('#%s - Worker terminated.', os.getpid())
    c.close()


    def main(config):
    """
    Simple program that consumes messages from Kafka topic and prints to
    STDOUT.
    """
    workers = []
    while True:
    num_alive = len([w for w in workers if w.is_alive()])
    if config['num_workers'] == num_alive:
    continue
    for _ in range(config['num_workers']-num_alive):
    p = Process(target=_consume, daemon=True, args=(config,))
    p.start()
    workers.append(p)
    logging.info('Starting worker #%s', p.pid)


    if __name__ == '__main__':
    logging.basicConfig(
    level=getattr(logging, os.getenv('LOGLEVEL', '').upper(), 'INFO'),
    format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',
    )
    main(config={
    'num_workers': 4,
    'num_threads': 4,
    'topic': 'my_topic_name',
    'kafka_kwargs': {
    'bootstrap.servers': ','.join([
    'cluster1.mykafka.com'
    'cluster2.mykafka.com'
    'cluster3.mykafka.com'
    ]),
    'group.id': 'my_consumer_group',
    'auto.offset.reset': 'earliest',
    # Commit manually to care for abrupt shutdown.
    'enable.auto.commit': False,
    },
    })