Created
October 18, 2016 11:11
-
-
Save cdadia/b2348f32ff148dc9ac3dd1649ed4891f to your computer and use it in GitHub Desktop.
py-queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import threading | |
| class EventQueue: | |
| def __init__(self): | |
| self._queue = self.Queue() | |
| self._results = self.Results() | |
| self._runner = self.Runner(self._queue.dequeue) | |
| self._runner_start() | |
| def enqueue(self, func, args=[], kwargs={}, high_priority=False): | |
| (set_result_func, get_result_func) = self._results.getContainer() | |
| element = self.Runner.packCall(func, args, kwargs, set_result_func) | |
| if self._runner.isRunning(): | |
| self._queue.enqueue(element, high_priority) | |
| else: | |
| self._runner.flagError( | |
| element, | |
| message="Event has been added after the stop() event.") | |
| return get_result_func | |
| def stop(self, high_priority=False): | |
| (set_result_func, get_result_func) = self._results.getContainer() | |
| element = self._runner.getStopCall(self._flushQueue, set_result_func) | |
| self._queue.enqueue(element, high_priority) | |
| return get_result_func | |
| def _flushQueue(self): | |
| while self._queue.hasMore(): | |
| element = self._queue.dequeue() | |
| self._runner.flagError(element) | |
| class Queue: | |
| def __init__(self): | |
| self._list = [] | |
| self._condition = threading.Condition() | |
| def enqueue(self, element, high_priority): | |
| with self._condition: | |
| if high_priority: | |
| self._list.insert(0, element) | |
| else: | |
| self._list.append(element) | |
| self._condition.notify() | |
| def hasMore(self): | |
| with self._condition: | |
| return len(self._list) > 0 | |
| def dequeue(self): | |
| with self._condition: | |
| while not self.hasMore(): | |
| self._condition.wait() | |
| return self._list.pop(0) | |
| class Results: | |
| def getContainer(self): | |
| container = self._Container() | |
| return (container.set_result, container.get_result) | |
| class _Container: | |
| def __init__(self): | |
| self._condition = threading.Condition() | |
| self._has_result = False | |
| self._result_is_exception = False | |
| self._result = None | |
| def set_result(self, result, result_is_exception): | |
| with self._condition: | |
| self._has_result = True | |
| self._result_is_exception = result_is_exception | |
| self._result = result | |
| self._condition.notify() | |
| def get_result(self): | |
| with self._condition: | |
| while not self._has_result: | |
| self._condition.wait() | |
| if self._result_is_exception: | |
| raise self._result | |
| else: | |
| return self._result | |
| class Runner(threading.Thread): | |
| def __init__(self, get_next_func): | |
| threading.Thread.__init__(self) | |
| self._running = True | |
| self._get_next_func = get_next_func | |
| self._stopLock = threading.Lock() | |
| def run(self): | |
| while self.isRunning()v: | |
| next = self._get_next_func() | |
| self._execute(next) | |
| def isRunning(self): | |
| with self._stopLock: | |
| return self._running | |
| def flagError(self, element, message="Event has not been processed."): | |
| (func, args, kwargs, setResultFunc) = element | |
| setResultFunc(UnprocessedEvent(message), result_is_exception=True) | |
| def get_stop_call(self, after_stop_func, set_result_func): | |
| return (self._stop, [after_stop_func], {}, set_result_func) | |
| @staticmethod | |
| def packCall(func, args, kwargs, set_result_func): | |
| return (func, args, kwargs, set_result_func) | |
| def _execute(self, element): | |
| (func, args, kwargs, set_result_func) = element | |
| try: | |
| result = func(*args, **kwargs) | |
| set_result_func(result, result_is_exception=False) | |
| except Exception as exception: | |
| set_result_func(exception, result_is_exception=True) | |
| def _stop(self, after_stop_func): | |
| with self._stopLock: | |
| self._running = False | |
| after_stop_func() | |
| class UnprocessedEvent(Exception): | |
| def __init__(self, reason): | |
| self._reason = reason | |
| def __str__(self): | |
| return str(self._reason) | |
| def __repr__(self): | |
| return "UnprocessedEvent(" + repr(self._reason) + ")" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment