Skip to content

Instantly share code, notes, and snippets.

@fespino
Created August 11, 2015 09:12
Show Gist options
  • Save fespino/ffda28e7fcc47b435f49 to your computer and use it in GitHub Desktop.
Save fespino/ffda28e7fcc47b435f49 to your computer and use it in GitHub Desktop.

Revisions

  1. fespino created this gist Aug 11, 2015.
    54 changes: 54 additions & 0 deletions kombu_adapter.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,54 @@
    # -*- coding: utf-8 -*-
    import kombu
    import kombu.mixins
    import kombu.common
    import transactional.controller

    class Worker(kombu.mixins.ConsumerMixin):

    def __init__(self, connection, controller, queues):
    self.connection = connection
    self.controller = controller
    self.queues = queues

    def get_consumers(self, consumer_cls, channel):
    return [
    consumer_cls(queues=self.queues, callbacks=[self.handle_request])
    ]

    def handle_request(self, body, message):
    try:
    response = self.controller.handle_request(body)
    print(message.properties)
    except Exception as e:
    print('Exception: {}'.format(e))
    finally:
    message.ack()


    class KombuAMQPAdapter(object):

    def __init__(self, controller, routing_key, worker_cls, host='amqp://guest:guest@localhost//'):
    self.controller = controller
    self.exchange = kombu.Exchange('pleyade-exchange', 'topic', durable=True)
    self.request_queue = kombu.Queue(routing_key, exchange=self.exchange, routing_key=routing_key)
    self.host = host
    self.worker_cls = worker_cls

    def start_service(self):
    print('Starting service at {}'.format(self.host))
    with kombu.Connection(self.host) as conn:
    self.request_queue(conn).declare()
    try:
    self.worker_cls(conn, self.controller, [self.request_queue]).run()
    except KeyboardInterrupt:
    self.stop_service()

    def stop_service(self):
    print('Bye!!')


    if __name__ == '__main__':
    controller = transactional.controller.FakeController()
    adapter = KombuAMQPAdapter(controller, 'transactional.request', Worker)
    adapter.start_service()