Skip to content

Instantly share code, notes, and snippets.

@leon0707
Last active January 20, 2020 21:55
Show Gist options
  • Save leon0707/256b04d33ca42580e4a54dda7bef873f to your computer and use it in GitHub Desktop.
Save leon0707/256b04d33ca42580e4a54dda7bef873f to your computer and use it in GitHub Desktop.
_semlock._get_value() NotImplementedError on mac
'''
qsize raises NotImplementedError on Unix platforms like Mac OS X where sem_getvalue() is not implemented.
https://docs.python.org/3.6/library/multiprocessing.html#multiprocessing.Queue.qsize
'''
import multiprocessing.queues
from queue import Empty, Full
class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n=0):
self.count = multiprocessing.Value('i', n)
def increment(self, n=1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" Return the value of the counter """
return self.count.value
class Queue(multiprocessing.queues.Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self, *args, **kwargs):
super(Queue, self).__init__(ctx=multiprocessing.get_context(), *args, **kwargs)
self.size = SharedCounter(0)
if 'maxsize' in kwargs:
self.maxsize = kwargs['kwargs']
else:
self.maxsize = None
def put(self, *args, **kwargs):
if self.maxsize != None and self.qsize() == self.maxsize:
raise Full
self.size.increment(1)
super(Queue, self).put(*args, **kwargs)
def get(self, *args, **kwargs):
if self.qsize() == 0:
raise Empty
self.size.increment(-1)
return super(Queue, self).get(*args, **kwargs)
def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self.size.value
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment