import certifi from dynaconf import settings from confluent_kafka.avro import AvroProducer from confluent_kafka import Consumer BASE_CONFIG = { 'bootstrap.servers': settings.KAFKA_PRIMARY_BOOTSTRAP_SERVERS, 'group.id': 'integrated-tests', 'client.id': 'integrated-tests', 'security.protocol': settings.KAFKA_PRIMARY_SECURITY_PROTOCOL, 'sasl.mechanisms': settings.get('KAFKA_PRIMARY_SASL_MECHANISM', 'PLAIN'), 'sasl.username': settings.KAFKA_PRIMARY_KEY, 'sasl.password': settings.KAFKA_PRIMARY_SECRET, 'ssl.endpoint.identification.algorithm': 'https', 'ssl.ca.location': certifi.where(), 'auto.offset.reset': 'earliest', } AVRO_SCHEMA_CONFIG = { 'schema.registry.url': settings.KAFKA_PRIMARY_SCHEMA_REGISTRY_ENDPOINT, 'schema.registry.basic.auth.credentials.source': 'USER_INFO', 'schema.registry.basic.auth.user.info': '{}:{}'.format( settings.KAFKA_PRIMARY_SCHEMA_REGISTRY_KEY, settings.KAFKA_PRIMARY_SCHEMA_REGISTRY_SECRET ) } PRODUCER_SCHEMA_CONFIG = {**BASE_CONFIG, **AVRO_SCHEMA_CONFIG} def setup_consumer(): return Consumer(BASE_CONFIG) def setup_producer(schema_value): producer_config = PRODUCER_SCHEMA_CONFIG.copy() producer_config['on_delivery'] = delivery_check avro_producer = AvroProducer( producer_config, default_value_schema=schema_value ) return avro_producer def delivery_check(err, msg): """ Reports the failure or success of a message delivery. Args: err (KafkaError): The error that occurred on None on success. msg (Message): The message that was produced or failed. """ if err is not None: raise ValueError("Failed to execute command: {}".format(err))