Created
January 8, 2015 03:15
-
-
Save Nexuapex/f7fbd6b4d67bc1c3da75 to your computer and use it in GitHub Desktop.
Revisions
-
Nexuapex created this gist
Jan 8, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,100 @@ from __future__ import absolute_import import sys import threading import collections class WorkerPool(object): """A thread pool that processes items from an unbounded work queue, for simple multi-producer, multi-consumer processes. The 'workers' argument controls the number of worker threads. The function 'func' is called on a worker thread and is passed a work item (as well as any other parameters passed to the initializer). The 'excepthook' property, which defaults to sys.excepthook, is called when the worker function raises an exception. The worker thread itself is not terminated. Note that stop() must be called to terminate gracefully. """ def __init__(self, workers, func, *args, **kwargs): self.excepthook = sys.excepthook self.__items = collections.deque() self.__mutex = threading.Lock() self.__semaphore = threading.Semaphore(value=0) self.__func = func self.__args = args self.__kwargs = kwargs self.__stopped = False self.__canceled = False threads = [] for _ in xrange(workers): thread = threading.Thread(target=self.__worker) thread.start() threads.append(thread) self.__threads = threads def kick(self, item): "Puts an item into the work queue." with self.__mutex: if self.__stopped: raise RuntimeError("Cannot kick new work items after stop() has been called") self.__items.appendleft(item) # Increase the 'pending work' counter. self.__semaphore.release() def stop(self, cancel=False, wait=False): """Stops the worker pool from processing any additional items and allows the worker threads to terminate. If 'cancel' is true, then the worker threads will terminate as soon as possible, without processing every work item. If 'wait' is true, then stop() will block until every work item has been processed and every worker thread has terminated. """ with self.__mutex: self.__stopped = True if cancel: self.__canceled = True # Wake up all sleeping worker threads. for thread in self.__threads: self.__semaphore.release() if wait: for thread in self.__threads: thread.join() def canceled(self): "True if the pool has been asked to cancel processing." return self.__canceled def __worker(self): "Worker thread main loop." while not self.__canceled: # Decrease the 'pending work' counter. self.__semaphore.acquire() if self.__canceled: break with self.__mutex: # Get a single item from the work queue. If there are no items # left in the queue, it must be time to stop. if self.__items: item = self.__items.pop() else: assert self.__stopped break try: self.__func(item, *self.__args, **self.__kwargs) except StandardError: self.excepthook(*sys.exc_info())