Created
July 10, 2011 18:43
-
-
Save gwik/1074831 to your computer and use it in GitHub Desktop.
Revisions
-
Antonin Amand revised this gist
Jul 10, 2011 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -23,7 +23,7 @@ | irc bot | | console print | | couch db client | +----------+ +---------------+ +-----------------+ (not implemented yet) | HTTP REST | +------------+ | couch db | +------------+ -
Antonin Amand revised this gist
Jul 10, 2011 . 1 changed file with 1 addition and 2 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,3 @@ # encoding: utf-8 """ @@ -23,7 +22,7 @@ +----------+ +---------------+ +-----------------+ | irc bot | | console print | | couch db client | +----------+ +---------------+ +-----------------+ (not implemented yet) | HTTP REST | +------------+ | couch db | -
Antonin Amand created this gist
Jul 10, 2011 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,288 @@ # encoding: utf-8 """ A python logging Handler that use ZeroMQ (ØMQ). +------+ +------+ +-------+ | app1 | | app2 | | app X | +------+ +------+ +-------+ | PUSH | PUSH | PUSH | | | +------------+--------------+ | | PULL +-------------+ | Dispatcher | +-------------+ | PUB | +--------------- + -------------------+ | | | SUB | SUB | SUB | +----------+ +---------------+ +-----------------+ | irc bot | | console print | | couch db client | +----------+ +---------------+ +-----------------+ (not implement yet) | HTTP REST | +------------+ | couch db | +------------+ This project aims at providing an overview of how to use ØMQ brokerless messaging facilities do dispach log entries to various channels. This is a toy project developed as I learn ØMQ and gevent, DO NOT USE IN PRODUCTION. Any feedback appreciated : [email protected] @gwik "app"'s can be any python process that log messages using python stdlib logging module. Log messages are dispatched using PUSH/PULL to a dispatcher that broadcast the messages to subscribers. Suscribers can send the messages to a IRC channel, print them on the console or store them in a couch database. You'll need the following packages installed : gevent gevent_zeromq couchdbkit You can start the different process as follow : python2.7 zeromqlogger.py log|dispatcher|couchsub|printersub You can start as many `log` as you want. Tested with python2.7 on MAC OS X 10.6. Copyleft! """ from gevent_zeromq import zmq from gevent import monkey import gevent.queue monkey.patch_all() import gevent.pool import logging import socket import json from datetime import datetime from restkit.globals import set_manager from restkit.manager.mgevent import GeventManager # set the gevent connection manager set_manager(GeventManager()) from couchdbkit import Server from couchdbkit.schema import properties, Document class Handler(logging.Handler): """ A logging handler for sending notifications to a ømq PUSH. """ def __init__(self, address, pool_size=5, level=logging.NOTSET): context = zmq.Context() self.hostname = socket.gethostname() self.socket = context.socket(zmq.PUSH) self.context = self.socket.context self.socket.connect(address) # channel queue, put always blocks until delivered self.channel = gevent.queue.Queue(0) self._job = gevent.spawn(self.__send) super(Handler, self).__init__(level) def createLock(self): pass def acquire(self): pass def release(self): pass def flush(self): pass def close(self): self._job.kill(timeout=2) def emit(self, record): self.channel.put(record) def __send(self): while True: record = self.channel.get() message = record.__dict__ message['hostname'] = self.hostname self.socket.send_json(message) class Dispatcher(gevent.Greenlet): """ PULL for messages and PUBlish them to SUBscribers. The pulling and publishing is happening in there own separate greenlet. They communicate via a channel queue. """ def __init__(self, pull_address, publish_address): super(Dispatcher, self).__init__() self.context = zmq.Context() self.pull_socket = self.context.socket(zmq.PULL) self.pull_socket.bind(pull_address) context = zmq.Context() self.publish_socket = context.socket(zmq.PUB) self.publish_socket.bind(publish_address) self.channel = gevent.queue.Queue(0) def _run(self): self._pull_job = gevent.spawn(self.__pull) self._publish_job = gevent.spawn(self.__publish) self._pull_job.join() self._publish_job.join() def __pull(self): while True: info = self.pull_socket.recv_json() self.channel.put(info) def __publish(self): while True: info = self.channel.get() self.publish_socket.send_multipart([ info['name'].encode('utf-8'), json.dumps(info)]) class PrintSubscriber(gevent.Greenlet): """ Subscribe to dispatcher and print on console standard output """ def __init__(self, address, topic=''): super(PrintSubscriber, self).__init__() self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) self.socket.setsockopt(zmq.SUBSCRIBE, topic) self.topic = topic self.socket.connect(address) def _run(self): while True: topic, info = self.socket.recv_multipart() info = json.loads(info) print "topic %s/%s [%d] %s" % ( self.topic, topic, info['process'], info['msg']) class LogEntry(Document): relativeCreated = properties.FloatProperty() msecs = properties.FloatProperty() args = properties.StringListProperty() name = properties.StringProperty() thread = properties.IntegerProperty() created = properties.DateTimeProperty() process = properties.IntegerProperty() threadNam = properties.StringProperty() module = properties.StringProperty() filename = properties.StringProperty() levelno = properties.IntegerProperty() processName = properties.StringProperty() pathname = properties.StringProperty() lineno = properties.IntegerProperty() exc_text = properties.StringProperty() exc_info = properties.StringProperty() funcName = properties.StringProperty() hostname = properties.StringProperty() levelname = properties.StringProperty() msg = properties.StringProperty() class CouchSubscriber(gevent.Greenlet): """ Subscriber that stores messages in a couch db. """ def __init__(self, couch, address): super(CouchSubscriber, self).__init__() self.server = couch self.context = zmq.Context() self.socket = self.context.socket(zmq.SUB) self.socket.setsockopt(zmq.SUBSCRIBE, '') self.socket.connect(address) def _run(self): while True: topic, info = self.socket.recv_multipart() info = json.loads(info) info['created'] = datetime.fromtimestamp(info['created']) doc = LogEntry(**info) doc.save() def run_couch_sub(address): couch = Server() db = couch.get_or_create_db('log') LogEntry.set_db(db) job = CouchSubscriber(couch, pub_address) job.start() return job def run_logger(log_address): import logging.config config = { 'version': 1, 'handlers': { 'zmq': { 'class': '__main__.Handler', 'level': 'DEBUG', 'address': log_address } }, 'root': { 'level': 'DEBUG', 'handlers': ['zmq'] }, } logging.config.dictConfig(config) def log(topic, wait=1): logger = logging.getLogger(topic) logger.setLevel(logging.DEBUG) while True: logger.info('some info behind logged') gevent.sleep(wait) return gevent.spawn(log, __name__, wait=1) if __name__ == '__main__': import sys name = sys.argv.pop() log_address = 'ipc:///tmp/zmqlog' pub_address = 'ipc:///tmp/logpub' if name == 'log': print "starting example logging application..." job = run_logger(log_address) job.join() elif name == 'dispatcher': print "starting dispatcher..." job = Dispatcher(log_address, pub_address) job.start() job.join() elif name == 'printersub': print "starting printer subscriber..." job = PrintSubscriber(pub_address) job.start() job.join() elif name == 'couchsub': print "starting couchdb subscriber..." job = run_couch_sub(pub_address) job.join() else: print "invalid usage : log|dispatcher|couchsub|printersub" exit(1)