Skip to content

Instantly share code, notes, and snippets.

@ask
Forked from ericflo/ericflopsy.py
Created June 7, 2009 20:36
Show Gist options
  • Select an option

  • Save ask/125476 to your computer and use it in GitHub Desktop.

Select an option

Save ask/125476 to your computer and use it in GitHub Desktop.

Revisions

  1. ask revised this gist Jun 7, 2009. 1 changed file with 6 additions and 3 deletions.
    9 changes: 6 additions & 3 deletions ericflopsy.py
    Original file line number Diff line number Diff line change
    @@ -3,7 +3,8 @@
    --------
    >>> import ericflopsy
    >>> consumer = ericflopsy.Consumer()
    >>> conn = ericflopsy.Connection()
    >>> consumer = ericflopsy.Consumer(connection=conn)
    >>> def print_message(message_body, message):
    ... print 'Recieved: ' + message_body
    ... message.ack()
    @@ -19,14 +20,16 @@
    ---------
    >>> import ericflopsy
    >>> publisher = ericflopsy.Publisher()
    >>> conn = ericflopsy.Connection()
    >>> publisher = ericflopsy.Publisher(connection=conn)
    >>> publisher.publish('messages_to_print', 'Test message!')
    >>> publisher.publish('messages_to_print_in_caps', 'hello, world!')
    """
    from carrot.connection import DjangoAMQPConnection

    from carrot.messaging import Consumer as BaseConsumer
    from carrot.messaging import Publisher as BasePublisher
    from carrot.connection import DjangoAMQPConnection as Connection

    # and voila. you have unit tests as well ;)

  2. ask revised this gist Jun 7, 2009. 1 changed file with 4 additions and 3 deletions.
    7 changes: 4 additions & 3 deletions ericflopsy.py
    Original file line number Diff line number Diff line change
    @@ -25,11 +25,12 @@
    """
    from carrot.connection import DjangoAMQPConnection
    from carrot.messaging import Consumer, Publisher
    from carrot.messaging import Consumer as BaseConsumer
    from carrot.messaging import Publisher as BasePublisher

    # and voila. you have unit tests as well ;)

    class NamedCallbackConsumer(Consumer):
    class Consumer(BaseConsumer):
    named_callbacks = None

    def __init__(self, *args, **kwargs):
    @@ -50,7 +51,7 @@ def receieve(self, message_data, message):
    callback_for_kind(data, message)


    class NamedCallbackPublisher(Publisher):
    class Publisher(BasePublisher):

    def publish(self, kind, message_data):
    return self.send({"kind": kind, "data": message_data})
  3. ask revised this gist Jun 7, 2009. 1 changed file with 26 additions and 0 deletions.
    26 changes: 26 additions & 0 deletions ericflopsy.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,29 @@
    """
    Consumer
    --------
    >>> import ericflopsy
    >>> consumer = ericflopsy.Consumer()
    >>> def print_message(message_body, message):
    ... print 'Recieved: ' + message_body
    ... message.ack()
    >>> consumer.register('messages_to_print', print_message)
    >>> def print_message_in_caps(message_body, message):
    ... print 'Received: ' + message_body.upper()
    ... message.ack()
    >>> consumer.register('messages_to_print_in_caps', print_message_in_caps)
    >>> consumer.wait()
    Publisher
    ---------
    >>> import ericflopsy
    >>> publisher = ericflopsy.Publisher()
    >>> publisher.publish('messages_to_print', 'Test message!')
    >>> publisher.publish('messages_to_print_in_caps', 'hello, world!')
    """
    from carrot.connection import DjangoAMQPConnection
    from carrot.messaging import Consumer, Publisher

  4. ask revised this gist Jun 7, 2009. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions ericflopsy.py
    Original file line number Diff line number Diff line change
    @@ -10,11 +10,11 @@ def __init__(self, *args, **kwargs):
    super(NamedCallbackConsumer, self).__init__(*args, **kwargs)
    self.named_callbacks = {}

    def register(self, name, callback):
    self.named_callbacks[name] = callback
    def register(self, kind, callback):
    self.named_callbacks[kind] = callback

    def unregister(self, kind):
    del self.callbacks[kind]
    del self.named_callbacks[kind]

    def receieve(self, message_data, message):
    kind = message_data.get("kind")
  5. ask revised this gist Jun 7, 2009. 1 changed file with 19 additions and 200 deletions.
    219 changes: 19 additions & 200 deletions ericflopsy.py
    Original file line number Diff line number Diff line change
    @@ -1,211 +1,30 @@
    """
    Copyright (c) 2008-2009, Nathan Borror, and modified heavily by Eric Florenzano
    All rights reserved.
    Redistribution and use in source and binary forms, with or without modification,
    are permitted provided that the following conditions are met:
    Redistributions of source code must retain the above copyright notice, this list
    of conditions and the following disclaimer.
    Redistributions in binary form must reproduce the above copyright notice, this
    list of conditions and the following disclaimer in the documentation and/or
    other materials provided with the distribution.
    Neither the name of the ericflopsy nor the names of its contributors may be
    used to endorse or promote products derived from this software without specific
    prior written permission.
    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
    ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
    ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
    ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    Ericflopsy
    ======
    A very simple way to interact with python AMQPlib. For my case I'm using
    RabbitMQ as my implementation but it should work with others.
    Consumer
    --------
    >>> import ericflopsy
    >>> consumer = ericflopsy.Consumer()
    >>> def print_message(message):
    ... print 'Recieved: ' + message.body
    ... consumer.channel.basic_ack(message.delivery_tag)
    ...
    >>> consumer.register('messages_to_print', print_message)
    >>> def print_message_in_caps(message):
    ... print 'Received: ' + message.body.upper()
    ... consumer.channel.basic_ack(message.delivery_tag)
    ...
    >>> consumer.register('messages_to_print_in_caps', print_message_in_caps)
    >>> consumer.wait()
    Publisher
    ---------
    >>> import ericflopsy
    >>> publisher = ericflopsy.Publisher()
    >>> publisher.publish('messages_to_print', 'Test message!')
    >>> publisher.publish('messages_to_print_in_caps', 'hello, world!')
    """

    import simplejson
    import uuid
    from carrot.connection import DjangoAMQPConnection
    from carrot.messaging import Consumer, Publisher

    from amqplib import client_0_8 as amqp
    # and voila. you have unit tests as well ;)

    DEFAULT_HOST = '127.0.0.1'
    DEFAULT_USER_ID = 'guest'
    DEFAULT_PASSWORD = 'guest'
    DEFAULT_VHOST = '/'
    DEFAULT_PORT = 5672
    DEFAULT_INSIST = False
    DEFAULT_QUEUE = 'default_queue'
    DEFAULT_ROUTING_KEY = 'default_routing_key'
    DEFAULT_EXCHANGE = 'default_exchange'
    DEFAULT_DURABLE = True
    DEFAULT_EXCLUSIVE = False
    DEFAULT_AUTO_DELETE = False
    DEFAULT_DELIVERY_MODE = 2
    try:
    from django.conf import settings
    DEFAULT_HOST = getattr(settings, 'AMQP_SERVER', DEFAULT_HOST)
    DEFAULT_USER_ID = getattr(settings, 'AMQP_USER', DEFAULT_USER_ID)
    DEFAULT_PASSWORD = getattr(settings, 'AMQP_PASSWORD', DEFAULT_PASSWORD)
    DEFAULT_VHOST = getattr(settings, 'AMQP_VHOST', DEFAULT_VHOST)
    DEFAULT_PORT = getattr(settings, 'AMQP_PORT', DEFAULT_PORT)
    DEFAULT_INSIST = getattr(settings, 'AMQP_INSIST', DEFAULT_INSIST)
    DEFAULT_QUEUE = getattr(settings, 'AMQP_QUEUE', DEFAULT_QUEUE)
    DEFAULT_ROUTING_KEY = getattr(settings, 'AMQP_ROUTING_KEY', DEFAULT_ROUTING_KEY)
    DEFAULT_EXCHANGE = getattr(settings, 'AMQP_EXCHANGE', DEFAULT_EXCHANGE)
    DEFAULT_DURABLE = getattr(settings, 'AMQP_DURABLE', DEFAULT_DURABLE)
    DEFAULT_EXCLUSIVE = getattr(settings, 'AMQP_EXCLUSIVE', DEFAULT_EXCLUSIVE)
    DEFAULT_AUTO_DELETE = getattr(settings, 'AMQP_AUTO_DELETE', DEFAULT_AUTO_DELETE)
    DEFAULT_DELIVERY_MODE = getattr(settings, 'AMQP_DELIVERY_MODE', DEFAULT_DELIVERY_MODE)
    except ImportError:
    pass
    class NamedCallbackConsumer(Consumer):
    named_callbacks = None

    def __init__(self, *args, **kwargs):
    super(NamedCallbackConsumer, self).__init__(*args, **kwargs)
    self.named_callbacks = {}

    class Connection(object):
    def __init__(self, host=DEFAULT_HOST, user_id=DEFAULT_USER_ID,
    password=DEFAULT_PASSWORD, vhost=DEFAULT_VHOST, port=DEFAULT_PORT,
    insist=DEFAULT_INSIST):

    self.host = host
    self.user_id = user_id
    self.password = password
    self.vhost = vhost
    self.port = port
    self.insist = insist

    self.connect()

    def connect(self):
    self.connection = amqp.Connection(
    host='%s:%s' % (self.host, self.port),
    userid=self.user_id,
    password=self.password,
    virtual_host=self.vhost,
    insist=self.insist
    )


    class Consumer(object):
    def __init__(self, routing_key=DEFAULT_ROUTING_KEY,
    exchange=DEFAULT_EXCHANGE, queue=DEFAULT_QUEUE,
    durable=DEFAULT_DURABLE, exclusive=DEFAULT_EXCLUSIVE,
    auto_delete=DEFAULT_AUTO_DELETE, connection=None):

    self.callbacks = {}

    self.routing_key = routing_key
    self.exchange = exchange
    self.queue = queue
    self.durable = durable
    self.exclusive = exclusive
    self.auto_delete = auto_delete
    self.connection = connection or Connection()
    self.channel = self.connection.connection.channel()

    self.channel.queue_declare(
    queue=self.queue,
    durable=self.durable,
    exclusive=self.exclusive,
    auto_delete=self.auto_delete
    )
    self.channel.exchange_declare(
    exchange=self.exchange,
    type='direct',
    durable=self.durable,
    auto_delete=self.auto_delete
    )
    self.channel.queue_bind(
    queue=self.queue,
    exchange=self.exchange,
    routing_key=self.routing_key
    )
    self.channel.basic_consume(
    queue=self.queue,
    no_ack=True,
    callback=self.dispatch,
    consumer_tag=str(uuid.uuid4())
    )

    def close(self):
    if getattr(self, 'channel'):
    self.channel.close()
    if getattr(self, 'connection'):
    self.connection.close()

    def wait(self):
    while True:
    self.channel.wait()

    def dispatch(self, message):
    decoded = simplejson.loads(message.body)
    message.body = decoded['data']
    callback = self.callbacks.get(decoded['kind'])
    if callback:
    callback(message)

    def register(self, kind, callback):
    self.callbacks[kind] = callback
    def register(self, name, callback):
    self.named_callbacks[name] = callback

    def unregister(self, kind):
    del self.callbacks[kind]

    def receieve(self, message_data, message):
    kind = message_data.get("kind")
    data = message_data.get("data")
    callback_for_kind = self.callbacks.get(kind)
    if callback_for_kind:
    callback_for_kind(data, message)


    class Publisher(object):
    def __init__(self, routing_key=DEFAULT_ROUTING_KEY,
    exchange=DEFAULT_EXCHANGE, connection=None,
    delivery_mode=DEFAULT_DELIVERY_MODE):

    self.connection = connection or Connection()
    self.channel = self.connection.connection.channel()
    self.exchange = exchange
    self.routing_key = routing_key
    self.delivery_mode = delivery_mode
    class NamedCallbackPublisher(Publisher):

    def publish(self, kind, message_data):
    encoded = simplejson.dumps({'kind': kind, 'data': message_data})
    message = amqp.Message(encoded)
    message.properties['delivery_mode'] = self.delivery_mode
    self.channel.basic_publish(
    message,
    exchange=self.exchange,
    routing_key=self.routing_key
    )
    return message

    def close(self):
    if getattr(self, 'channel'):
    self.channel.close()
    if getattr(self, 'connection'):
    self.connection.connection.close()
    return self.send({"kind": kind, "data": message_data})
  6. @ericflo ericflo renamed this gist Jun 6, 2009. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  7. @ericflo ericflo created this gist Jun 6, 2009.
    211 changes: 211 additions & 0 deletions gistfile1.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,211 @@
    """
    Copyright (c) 2008-2009, Nathan Borror, and modified heavily by Eric Florenzano
    All rights reserved.
    Redistribution and use in source and binary forms, with or without modification,
    are permitted provided that the following conditions are met:
    Redistributions of source code must retain the above copyright notice, this list
    of conditions and the following disclaimer.
    Redistributions in binary form must reproduce the above copyright notice, this
    list of conditions and the following disclaimer in the documentation and/or
    other materials provided with the distribution.
    Neither the name of the ericflopsy nor the names of its contributors may be
    used to endorse or promote products derived from this software without specific
    prior written permission.
    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
    ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
    WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
    ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
    (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
    ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
    SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    Ericflopsy
    ======
    A very simple way to interact with python AMQPlib. For my case I'm using
    RabbitMQ as my implementation but it should work with others.
    Consumer
    --------
    >>> import ericflopsy
    >>> consumer = ericflopsy.Consumer()
    >>> def print_message(message):
    ... print 'Recieved: ' + message.body
    ... consumer.channel.basic_ack(message.delivery_tag)
    ...
    >>> consumer.register('messages_to_print', print_message)
    >>> def print_message_in_caps(message):
    ... print 'Received: ' + message.body.upper()
    ... consumer.channel.basic_ack(message.delivery_tag)
    ...
    >>> consumer.register('messages_to_print_in_caps', print_message_in_caps)
    >>> consumer.wait()
    Publisher
    ---------
    >>> import ericflopsy
    >>> publisher = ericflopsy.Publisher()
    >>> publisher.publish('messages_to_print', 'Test message!')
    >>> publisher.publish('messages_to_print_in_caps', 'hello, world!')
    """

    import simplejson
    import uuid

    from amqplib import client_0_8 as amqp

    DEFAULT_HOST = '127.0.0.1'
    DEFAULT_USER_ID = 'guest'
    DEFAULT_PASSWORD = 'guest'
    DEFAULT_VHOST = '/'
    DEFAULT_PORT = 5672
    DEFAULT_INSIST = False
    DEFAULT_QUEUE = 'default_queue'
    DEFAULT_ROUTING_KEY = 'default_routing_key'
    DEFAULT_EXCHANGE = 'default_exchange'
    DEFAULT_DURABLE = True
    DEFAULT_EXCLUSIVE = False
    DEFAULT_AUTO_DELETE = False
    DEFAULT_DELIVERY_MODE = 2
    try:
    from django.conf import settings
    DEFAULT_HOST = getattr(settings, 'AMQP_SERVER', DEFAULT_HOST)
    DEFAULT_USER_ID = getattr(settings, 'AMQP_USER', DEFAULT_USER_ID)
    DEFAULT_PASSWORD = getattr(settings, 'AMQP_PASSWORD', DEFAULT_PASSWORD)
    DEFAULT_VHOST = getattr(settings, 'AMQP_VHOST', DEFAULT_VHOST)
    DEFAULT_PORT = getattr(settings, 'AMQP_PORT', DEFAULT_PORT)
    DEFAULT_INSIST = getattr(settings, 'AMQP_INSIST', DEFAULT_INSIST)
    DEFAULT_QUEUE = getattr(settings, 'AMQP_QUEUE', DEFAULT_QUEUE)
    DEFAULT_ROUTING_KEY = getattr(settings, 'AMQP_ROUTING_KEY', DEFAULT_ROUTING_KEY)
    DEFAULT_EXCHANGE = getattr(settings, 'AMQP_EXCHANGE', DEFAULT_EXCHANGE)
    DEFAULT_DURABLE = getattr(settings, 'AMQP_DURABLE', DEFAULT_DURABLE)
    DEFAULT_EXCLUSIVE = getattr(settings, 'AMQP_EXCLUSIVE', DEFAULT_EXCLUSIVE)
    DEFAULT_AUTO_DELETE = getattr(settings, 'AMQP_AUTO_DELETE', DEFAULT_AUTO_DELETE)
    DEFAULT_DELIVERY_MODE = getattr(settings, 'AMQP_DELIVERY_MODE', DEFAULT_DELIVERY_MODE)
    except ImportError:
    pass


    class Connection(object):
    def __init__(self, host=DEFAULT_HOST, user_id=DEFAULT_USER_ID,
    password=DEFAULT_PASSWORD, vhost=DEFAULT_VHOST, port=DEFAULT_PORT,
    insist=DEFAULT_INSIST):

    self.host = host
    self.user_id = user_id
    self.password = password
    self.vhost = vhost
    self.port = port
    self.insist = insist

    self.connect()

    def connect(self):
    self.connection = amqp.Connection(
    host='%s:%s' % (self.host, self.port),
    userid=self.user_id,
    password=self.password,
    virtual_host=self.vhost,
    insist=self.insist
    )


    class Consumer(object):
    def __init__(self, routing_key=DEFAULT_ROUTING_KEY,
    exchange=DEFAULT_EXCHANGE, queue=DEFAULT_QUEUE,
    durable=DEFAULT_DURABLE, exclusive=DEFAULT_EXCLUSIVE,
    auto_delete=DEFAULT_AUTO_DELETE, connection=None):

    self.callbacks = {}

    self.routing_key = routing_key
    self.exchange = exchange
    self.queue = queue
    self.durable = durable
    self.exclusive = exclusive
    self.auto_delete = auto_delete
    self.connection = connection or Connection()
    self.channel = self.connection.connection.channel()

    self.channel.queue_declare(
    queue=self.queue,
    durable=self.durable,
    exclusive=self.exclusive,
    auto_delete=self.auto_delete
    )
    self.channel.exchange_declare(
    exchange=self.exchange,
    type='direct',
    durable=self.durable,
    auto_delete=self.auto_delete
    )
    self.channel.queue_bind(
    queue=self.queue,
    exchange=self.exchange,
    routing_key=self.routing_key
    )
    self.channel.basic_consume(
    queue=self.queue,
    no_ack=True,
    callback=self.dispatch,
    consumer_tag=str(uuid.uuid4())
    )

    def close(self):
    if getattr(self, 'channel'):
    self.channel.close()
    if getattr(self, 'connection'):
    self.connection.close()

    def wait(self):
    while True:
    self.channel.wait()

    def dispatch(self, message):
    decoded = simplejson.loads(message.body)
    message.body = decoded['data']
    callback = self.callbacks.get(decoded['kind'])
    if callback:
    callback(message)

    def register(self, kind, callback):
    self.callbacks[kind] = callback

    def unregister(self, kind):
    del self.callbacks[kind]


    class Publisher(object):
    def __init__(self, routing_key=DEFAULT_ROUTING_KEY,
    exchange=DEFAULT_EXCHANGE, connection=None,
    delivery_mode=DEFAULT_DELIVERY_MODE):

    self.connection = connection or Connection()
    self.channel = self.connection.connection.channel()
    self.exchange = exchange
    self.routing_key = routing_key
    self.delivery_mode = delivery_mode

    def publish(self, kind, message_data):
    encoded = simplejson.dumps({'kind': kind, 'data': message_data})
    message = amqp.Message(encoded)
    message.properties['delivery_mode'] = self.delivery_mode
    self.channel.basic_publish(
    message,
    exchange=self.exchange,
    routing_key=self.routing_key
    )
    return message

    def close(self):
    if getattr(self, 'channel'):
    self.channel.close()
    if getattr(self, 'connection'):
    self.connection.connection.close()