Skip to content

Instantly share code, notes, and snippets.

@cdadia
Created October 18, 2016 11:11
Show Gist options
  • Select an option

  • Save cdadia/b2348f32ff148dc9ac3dd1649ed4891f to your computer and use it in GitHub Desktop.

Select an option

Save cdadia/b2348f32ff148dc9ac3dd1649ed4891f to your computer and use it in GitHub Desktop.
py-queue
import threading
class EventQueue:
def __init__(self):
self._queue = self.Queue()
self._results = self.Results()
self._runner = self.Runner(self._queue.dequeue)
self._runner_start()
def enqueue(self, func, args=[], kwargs={}, high_priority=False):
(set_result_func, get_result_func) = self._results.getContainer()
element = self.Runner.packCall(func, args, kwargs, set_result_func)
if self._runner.isRunning():
self._queue.enqueue(element, high_priority)
else:
self._runner.flagError(
element,
message="Event has been added after the stop() event.")
return get_result_func
def stop(self, high_priority=False):
(set_result_func, get_result_func) = self._results.getContainer()
element = self._runner.getStopCall(self._flushQueue, set_result_func)
self._queue.enqueue(element, high_priority)
return get_result_func
def _flushQueue(self):
while self._queue.hasMore():
element = self._queue.dequeue()
self._runner.flagError(element)
class Queue:
def __init__(self):
self._list = []
self._condition = threading.Condition()
def enqueue(self, element, high_priority):
with self._condition:
if high_priority:
self._list.insert(0, element)
else:
self._list.append(element)
self._condition.notify()
def hasMore(self):
with self._condition:
return len(self._list) > 0
def dequeue(self):
with self._condition:
while not self.hasMore():
self._condition.wait()
return self._list.pop(0)
class Results:
def getContainer(self):
container = self._Container()
return (container.set_result, container.get_result)
class _Container:
def __init__(self):
self._condition = threading.Condition()
self._has_result = False
self._result_is_exception = False
self._result = None
def set_result(self, result, result_is_exception):
with self._condition:
self._has_result = True
self._result_is_exception = result_is_exception
self._result = result
self._condition.notify()
def get_result(self):
with self._condition:
while not self._has_result:
self._condition.wait()
if self._result_is_exception:
raise self._result
else:
return self._result
class Runner(threading.Thread):
def __init__(self, get_next_func):
threading.Thread.__init__(self)
self._running = True
self._get_next_func = get_next_func
self._stopLock = threading.Lock()
def run(self):
while self.isRunning()v:
next = self._get_next_func()
self._execute(next)
def isRunning(self):
with self._stopLock:
return self._running
def flagError(self, element, message="Event has not been processed."):
(func, args, kwargs, setResultFunc) = element
setResultFunc(UnprocessedEvent(message), result_is_exception=True)
def get_stop_call(self, after_stop_func, set_result_func):
return (self._stop, [after_stop_func], {}, set_result_func)
@staticmethod
def packCall(func, args, kwargs, set_result_func):
return (func, args, kwargs, set_result_func)
def _execute(self, element):
(func, args, kwargs, set_result_func) = element
try:
result = func(*args, **kwargs)
set_result_func(result, result_is_exception=False)
except Exception as exception:
set_result_func(exception, result_is_exception=True)
def _stop(self, after_stop_func):
with self._stopLock:
self._running = False
after_stop_func()
class UnprocessedEvent(Exception):
def __init__(self, reason):
self._reason = reason
def __str__(self):
return str(self._reason)
def __repr__(self):
return "UnprocessedEvent(" + repr(self._reason) + ")"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment