# -*- coding: utf-8 -*- import kombu import kombu.mixins import kombu.common import transactional.controller class Worker(kombu.mixins.ConsumerMixin): def __init__(self, connection, controller, queues): self.connection = connection self.controller = controller self.queues = queues def get_consumers(self, consumer_cls, channel): return [ consumer_cls(queues=self.queues, callbacks=[self.handle_request]) ] def handle_request(self, body, message): try: response = self.controller.handle_request(body) print(message.properties) except Exception as e: print('Exception: {}'.format(e)) finally: message.ack() class KombuAMQPAdapter(object): def __init__(self, controller, routing_key, worker_cls, host='amqp://guest:guest@localhost//'): self.controller = controller self.exchange = kombu.Exchange('pleyade-exchange', 'topic', durable=True) self.request_queue = kombu.Queue(routing_key, exchange=self.exchange, routing_key=routing_key) self.host = host self.worker_cls = worker_cls def start_service(self): print('Starting service at {}'.format(self.host)) with kombu.Connection(self.host) as conn: self.request_queue(conn).declare() try: self.worker_cls(conn, self.controller, [self.request_queue]).run() except KeyboardInterrupt: self.stop_service() def stop_service(self): print('Bye!!') if __name__ == '__main__': controller = transactional.controller.FakeController() adapter = KombuAMQPAdapter(controller, 'transactional.request', Worker) adapter.start_service()