import threading class EventQueue: def __init__(self): self._queue = self.Queue() self._results = self.Results() self._runner = self.Runner(self._queue.dequeue) self._runner_start() def enqueue(self, func, args=[], kwargs={}, high_priority=False): (set_result_func, get_result_func) = self._results.getContainer() element = self.Runner.packCall(func, args, kwargs, set_result_func) if self._runner.isRunning(): self._queue.enqueue(element, high_priority) else: self._runner.flagError( element, message="Event has been added after the stop() event.") return get_result_func def stop(self, high_priority=False): (set_result_func, get_result_func) = self._results.getContainer() element = self._runner.getStopCall(self._flushQueue, set_result_func) self._queue.enqueue(element, high_priority) return get_result_func def _flushQueue(self): while self._queue.hasMore(): element = self._queue.dequeue() self._runner.flagError(element) class Queue: def __init__(self): self._list = [] self._condition = threading.Condition() def enqueue(self, element, high_priority): with self._condition: if high_priority: self._list.insert(0, element) else: self._list.append(element) self._condition.notify() def hasMore(self): with self._condition: return len(self._list) > 0 def dequeue(self): with self._condition: while not self.hasMore(): self._condition.wait() return self._list.pop(0) class Results: def getContainer(self): container = self._Container() return (container.set_result, container.get_result) class _Container: def __init__(self): self._condition = threading.Condition() self._has_result = False self._result_is_exception = False self._result = None def set_result(self, result, result_is_exception): with self._condition: self._has_result = True self._result_is_exception = result_is_exception self._result = result self._condition.notify() def get_result(self): with self._condition: while not self._has_result: self._condition.wait() if self._result_is_exception: raise self._result else: return self._result class Runner(threading.Thread): def __init__(self, get_next_func): threading.Thread.__init__(self) self._running = True self._get_next_func = get_next_func self._stopLock = threading.Lock() def run(self): while self.isRunning()v: next = self._get_next_func() self._execute(next) def isRunning(self): with self._stopLock: return self._running def flagError(self, element, message="Event has not been processed."): (func, args, kwargs, setResultFunc) = element setResultFunc(UnprocessedEvent(message), result_is_exception=True) def get_stop_call(self, after_stop_func, set_result_func): return (self._stop, [after_stop_func], {}, set_result_func) @staticmethod def packCall(func, args, kwargs, set_result_func): return (func, args, kwargs, set_result_func) def _execute(self, element): (func, args, kwargs, set_result_func) = element try: result = func(*args, **kwargs) set_result_func(result, result_is_exception=False) except Exception as exception: set_result_func(exception, result_is_exception=True) def _stop(self, after_stop_func): with self._stopLock: self._running = False after_stop_func() class UnprocessedEvent(Exception): def __init__(self, reason): self._reason = reason def __str__(self): return str(self._reason) def __repr__(self): return "UnprocessedEvent(" + repr(self._reason) + ")"