""" Module for consuming messages on a kafka cluster + kafka schema register. Since the AvroConsumer from the confluent lib does not work out o the box, its required to do the decoding manually. Luckly someone already pass throught this pain and did a blog post about this. https://mlnotetaking.com/post/fixing-kafka-string-key-and-avro-value-python """ import time import logging import avro.schema from config import setup_consumer from utils import unpack, fetch_schema, msg_sanity_check LOGGER = logging.getLogger() LOGGER.setLevel(logging.DEBUG) def event_polling(topic, key_value, msg_value, qty_msgs): """ Polling on the kafka stream and validate the msg key and value before returning data """ consumer = setup_consumer() schema_str_value = fetch_schema(topic) value_schema = avro.schema.Parse(schema_str_value) consumer.subscribe([topic]) msgs = [] timeout = time.time() + 20 while timeout >= time.time() : raw_msg = consumer.poll(1) if not msg_sanity_check(raw_msg): continue msg = unpack(raw_msg.value(), value_schema) if msg[key_value] == msg_value: msgs.append(msg) if len(msgs) == qty_msgs: break if len(msgs) < qty_msgs: raise Exception('Less events than expected') consumer.close() return msgs