Skip to content

Instantly share code, notes, and snippets.

@gwik
Created July 10, 2011 18:43
Show Gist options
  • Select an option

  • Save gwik/1074831 to your computer and use it in GitHub Desktop.

Select an option

Save gwik/1074831 to your computer and use it in GitHub Desktop.

Revisions

  1. Antonin Amand revised this gist Jul 10, 2011. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion zerologger.py
    Original file line number Diff line number Diff line change
    @@ -23,7 +23,7 @@
    | irc bot | | console print | | couch db client |
    +----------+ +---------------+ +-----------------+
    (not implemented yet) | HTTP REST
    |
    |
    +------------+
    | couch db |
    +------------+
  2. Antonin Amand revised this gist Jul 10, 2011. 1 changed file with 1 addition and 2 deletions.
    3 changes: 1 addition & 2 deletions zerologger.py
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,3 @@

    # encoding: utf-8

    """
    @@ -23,7 +22,7 @@
    +----------+ +---------------+ +-----------------+
    | irc bot | | console print | | couch db client |
    +----------+ +---------------+ +-----------------+
    (not implement yet) | HTTP REST
    (not implemented yet) | HTTP REST
    |
    +------------+
    | couch db |
  3. Antonin Amand created this gist Jul 10, 2011.
    288 changes: 288 additions & 0 deletions zerologger.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,288 @@

    # encoding: utf-8

    """
    A python logging Handler that use ZeroMQ (ØMQ).
    +------+ +------+ +-------+
    | app1 | | app2 | | app X |
    +------+ +------+ +-------+
    | PUSH | PUSH | PUSH
    | | |
    +------------+--------------+
    |
    | PULL
    +-------------+
    | Dispatcher |
    +-------------+
    | PUB
    |
    +--------------- + -------------------+
    | | |
    SUB | SUB | SUB |
    +----------+ +---------------+ +-----------------+
    | irc bot | | console print | | couch db client |
    +----------+ +---------------+ +-----------------+
    (not implement yet) | HTTP REST
    |
    +------------+
    | couch db |
    +------------+
    This project aims at providing an overview of how to use ØMQ brokerless
    messaging facilities do dispach log entries to various channels.
    This is a toy project developed as I learn ØMQ and gevent,
    DO NOT USE IN PRODUCTION.
    Any feedback appreciated : [email protected] @gwik
    "app"'s can be any python process that log messages using python
    stdlib logging module. Log messages are dispatched using
    PUSH/PULL to a dispatcher that broadcast the messages to subscribers.
    Suscribers can send the messages to a IRC channel, print them on the
    console or store them in a couch database.
    You'll need the following packages installed :
    gevent gevent_zeromq couchdbkit
    You can start the different process as follow :
    python2.7 zeromqlogger.py log|dispatcher|couchsub|printersub
    You can start as many `log` as you want.
    Tested with python2.7 on MAC OS X 10.6.
    Copyleft!
    """

    from gevent_zeromq import zmq
    from gevent import monkey
    import gevent.queue
    monkey.patch_all()
    import gevent.pool
    import logging
    import socket
    import json
    from datetime import datetime

    from restkit.globals import set_manager
    from restkit.manager.mgevent import GeventManager
    # set the gevent connection manager
    set_manager(GeventManager())

    from couchdbkit import Server
    from couchdbkit.schema import properties, Document


    class Handler(logging.Handler):
    """ A logging handler for sending notifications to a ømq PUSH.
    """

    def __init__(self, address,
    pool_size=5, level=logging.NOTSET):
    context = zmq.Context()
    self.hostname = socket.gethostname()
    self.socket = context.socket(zmq.PUSH)
    self.context = self.socket.context
    self.socket.connect(address)
    # channel queue, put always blocks until delivered
    self.channel = gevent.queue.Queue(0)
    self._job = gevent.spawn(self.__send)
    super(Handler, self).__init__(level)

    def createLock(self):
    pass

    def acquire(self):
    pass

    def release(self):
    pass

    def flush(self):
    pass

    def close(self):
    self._job.kill(timeout=2)

    def emit(self, record):
    self.channel.put(record)

    def __send(self):
    while True:
    record = self.channel.get()
    message = record.__dict__
    message['hostname'] = self.hostname
    self.socket.send_json(message)


    class Dispatcher(gevent.Greenlet):
    """ PULL for messages and PUBlish them to SUBscribers.
    The pulling and publishing is happening in there own separate
    greenlet.
    They communicate via a channel queue.
    """

    def __init__(self, pull_address, publish_address):
    super(Dispatcher, self).__init__()
    self.context = zmq.Context()
    self.pull_socket = self.context.socket(zmq.PULL)
    self.pull_socket.bind(pull_address)
    context = zmq.Context()
    self.publish_socket = context.socket(zmq.PUB)
    self.publish_socket.bind(publish_address)
    self.channel = gevent.queue.Queue(0)

    def _run(self):
    self._pull_job = gevent.spawn(self.__pull)
    self._publish_job = gevent.spawn(self.__publish)
    self._pull_job.join()
    self._publish_job.join()

    def __pull(self):
    while True:
    info = self.pull_socket.recv_json()
    self.channel.put(info)

    def __publish(self):
    while True:
    info = self.channel.get()
    self.publish_socket.send_multipart([
    info['name'].encode('utf-8'),
    json.dumps(info)])


    class PrintSubscriber(gevent.Greenlet):
    """ Subscribe to dispatcher and print on console standard output
    """

    def __init__(self, address, topic=''):
    super(PrintSubscriber, self).__init__()
    self.context = zmq.Context()
    self.socket = self.context.socket(zmq.SUB)
    self.socket.setsockopt(zmq.SUBSCRIBE, topic)
    self.topic = topic
    self.socket.connect(address)

    def _run(self):
    while True:
    topic, info = self.socket.recv_multipart()
    info = json.loads(info)
    print "topic %s/%s [%d] %s" % (
    self.topic, topic, info['process'], info['msg'])


    class LogEntry(Document):
    relativeCreated = properties.FloatProperty()
    msecs = properties.FloatProperty()
    args = properties.StringListProperty()
    name = properties.StringProperty()
    thread = properties.IntegerProperty()
    created = properties.DateTimeProperty()
    process = properties.IntegerProperty()
    threadNam = properties.StringProperty()
    module = properties.StringProperty()
    filename = properties.StringProperty()
    levelno = properties.IntegerProperty()
    processName = properties.StringProperty()
    pathname = properties.StringProperty()
    lineno = properties.IntegerProperty()
    exc_text = properties.StringProperty()
    exc_info = properties.StringProperty()
    funcName = properties.StringProperty()
    hostname = properties.StringProperty()
    levelname = properties.StringProperty()
    msg = properties.StringProperty()


    class CouchSubscriber(gevent.Greenlet):
    """ Subscriber that stores messages in a couch db.
    """

    def __init__(self, couch, address):
    super(CouchSubscriber, self).__init__()
    self.server = couch
    self.context = zmq.Context()
    self.socket = self.context.socket(zmq.SUB)
    self.socket.setsockopt(zmq.SUBSCRIBE, '')
    self.socket.connect(address)

    def _run(self):
    while True:
    topic, info = self.socket.recv_multipart()
    info = json.loads(info)
    info['created'] = datetime.fromtimestamp(info['created'])
    doc = LogEntry(**info)
    doc.save()

    def run_couch_sub(address):
    couch = Server()
    db = couch.get_or_create_db('log')
    LogEntry.set_db(db)
    job = CouchSubscriber(couch, pub_address)
    job.start()
    return job

    def run_logger(log_address):
    import logging.config

    config = {
    'version': 1,
    'handlers': {
    'zmq': {
    'class': '__main__.Handler',
    'level': 'DEBUG',
    'address': log_address
    }
    },
    'root': {
    'level': 'DEBUG',
    'handlers': ['zmq']
    },
    }

    logging.config.dictConfig(config)

    def log(topic, wait=1):
    logger = logging.getLogger(topic)
    logger.setLevel(logging.DEBUG)

    while True:
    logger.info('some info behind logged')
    gevent.sleep(wait)

    return gevent.spawn(log, __name__, wait=1)


    if __name__ == '__main__':
    import sys
    name = sys.argv.pop()

    log_address = 'ipc:///tmp/zmqlog'
    pub_address = 'ipc:///tmp/logpub'

    if name == 'log':
    print "starting example logging application..."
    job = run_logger(log_address)
    job.join()
    elif name == 'dispatcher':
    print "starting dispatcher..."
    job = Dispatcher(log_address, pub_address)
    job.start()
    job.join()
    elif name == 'printersub':
    print "starting printer subscriber..."
    job = PrintSubscriber(pub_address)
    job.start()
    job.join()
    elif name == 'couchsub':
    print "starting couchdb subscriber..."
    job = run_couch_sub(pub_address)
    job.join()
    else:
    print "invalid usage : log|dispatcher|couchsub|printersub"
    exit(1)