Skip to content

Instantly share code, notes, and snippets.

@Nexuapex
Created January 8, 2015 03:15
Show Gist options
  • Save Nexuapex/f7fbd6b4d67bc1c3da75 to your computer and use it in GitHub Desktop.
Save Nexuapex/f7fbd6b4d67bc1c3da75 to your computer and use it in GitHub Desktop.

Revisions

  1. Nexuapex created this gist Jan 8, 2015.
    100 changes: 100 additions & 0 deletions workerpool.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,100 @@
    from __future__ import absolute_import

    import sys
    import threading
    import collections


    class WorkerPool(object):
    """A thread pool that processes items from an unbounded work queue, for
    simple multi-producer, multi-consumer processes.
    The 'workers' argument controls the number of worker threads. The function
    'func' is called on a worker thread and is passed a work item (as well as
    any other parameters passed to the initializer).
    The 'excepthook' property, which defaults to sys.excepthook, is called when
    the worker function raises an exception. The worker thread itself is not
    terminated.
    Note that stop() must be called to terminate gracefully.
    """

    def __init__(self, workers, func, *args, **kwargs):
    self.excepthook = sys.excepthook

    self.__items = collections.deque()
    self.__mutex = threading.Lock()
    self.__semaphore = threading.Semaphore(value=0)
    self.__func = func
    self.__args = args
    self.__kwargs = kwargs
    self.__stopped = False
    self.__canceled = False

    threads = []
    for _ in xrange(workers):
    thread = threading.Thread(target=self.__worker)
    thread.start()
    threads.append(thread)
    self.__threads = threads

    def kick(self, item):
    "Puts an item into the work queue."
    with self.__mutex:
    if self.__stopped:
    raise RuntimeError("Cannot kick new work items after stop() has been called")
    self.__items.appendleft(item)

    # Increase the 'pending work' counter.
    self.__semaphore.release()

    def stop(self, cancel=False, wait=False):
    """Stops the worker pool from processing any additional items and allows
    the worker threads to terminate.
    If 'cancel' is true, then the worker threads will terminate as soon as
    possible, without processing every work item.
    If 'wait' is true, then stop() will block until every work item has been
    processed and every worker thread has terminated.
    """
    with self.__mutex:
    self.__stopped = True
    if cancel:
    self.__canceled = True

    # Wake up all sleeping worker threads.
    for thread in self.__threads:
    self.__semaphore.release()

    if wait:
    for thread in self.__threads:
    thread.join()

    def canceled(self):
    "True if the pool has been asked to cancel processing."
    return self.__canceled

    def __worker(self):
    "Worker thread main loop."
    while not self.__canceled:
    # Decrease the 'pending work' counter.
    self.__semaphore.acquire()

    if self.__canceled:
    break

    with self.__mutex:
    # Get a single item from the work queue. If there are no items
    # left in the queue, it must be time to stop.
    if self.__items:
    item = self.__items.pop()
    else:
    assert self.__stopped
    break

    try:
    self.__func(item, *self.__args, **self.__kwargs)
    except StandardError:
    self.excepthook(*sys.exc_info())