Skip to content

Instantly share code, notes, and snippets.

@leon0707
Last active January 20, 2020 21:55
Show Gist options
  • Save leon0707/256b04d33ca42580e4a54dda7bef873f to your computer and use it in GitHub Desktop.
Save leon0707/256b04d33ca42580e4a54dda7bef873f to your computer and use it in GitHub Desktop.

Revisions

  1. leon0707 revised this gist Jan 20, 2020. 1 changed file with 18 additions and 3 deletions.
    21 changes: 18 additions & 3 deletions mac_queue.py
    Original file line number Diff line number Diff line change
    @@ -1,3 +1,10 @@
    '''
    qsize raises NotImplementedError on Unix platforms like Mac OS X where sem_getvalue() is not implemented.
    https://docs.python.org/3.6/library/multiprocessing.html#multiprocessing.Queue.qsize
    '''
    import multiprocessing.queues
    from queue import Empty, Full

    class SharedCounter(object):
    """ A synchronized shared counter.
    @@ -13,10 +20,10 @@ class SharedCounter(object):
    """

    def __init__(self, n = 0):
    def __init__(self, n=0):
    self.count = multiprocessing.Value('i', n)

    def increment(self, n = 1):
    def increment(self, n=1):
    """ Increment the counter by n (default = 1) """
    with self.count.get_lock():
    self.count.value += n
    @@ -42,14 +49,22 @@ class Queue(multiprocessing.queues.Queue):
    """

    def __init__(self, *args, **kwargs):
    super(Queue, self).__init__(*args, **kwargs)
    super(Queue, self).__init__(ctx=multiprocessing.get_context(), *args, **kwargs)
    self.size = SharedCounter(0)
    if 'maxsize' in kwargs:
    self.maxsize = kwargs['kwargs']
    else:
    self.maxsize = None

    def put(self, *args, **kwargs):
    if self.maxsize != None and self.qsize() == self.maxsize:
    raise Full
    self.size.increment(1)
    super(Queue, self).put(*args, **kwargs)

    def get(self, *args, **kwargs):
    if self.qsize() == 0:
    raise Empty
    self.size.increment(-1)
    return super(Queue, self).get(*args, **kwargs)

  2. leon0707 created this gist Jan 20, 2020.
    62 changes: 62 additions & 0 deletions mac_queue.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,62 @@
    class SharedCounter(object):
    """ A synchronized shared counter.
    The locking done by multiprocessing.Value ensures that only a single
    process or thread may read or write the in-memory ctypes object. However,
    in order to do n += 1, Python performs a read followed by a write, so a
    second process may read the old value before the new one is written by the
    first process. The solution is to use a multiprocessing.Lock to guarantee
    the atomicity of the modifications to Value.
    This class comes almost entirely from Eli Bendersky's blog:
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
    """

    def __init__(self, n = 0):
    self.count = multiprocessing.Value('i', n)

    def increment(self, n = 1):
    """ Increment the counter by n (default = 1) """
    with self.count.get_lock():
    self.count.value += n

    @property
    def value(self):
    """ Return the value of the counter """
    return self.count.value


    class Queue(multiprocessing.queues.Queue):
    """ A portable implementation of multiprocessing.Queue.
    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().
    """

    def __init__(self, *args, **kwargs):
    super(Queue, self).__init__(*args, **kwargs)
    self.size = SharedCounter(0)

    def put(self, *args, **kwargs):
    self.size.increment(1)
    super(Queue, self).put(*args, **kwargs)

    def get(self, *args, **kwargs):
    self.size.increment(-1)
    return super(Queue, self).get(*args, **kwargs)

    def qsize(self):
    """ Reliable implementation of multiprocessing.Queue.qsize() """
    return self.size.value

    def empty(self):
    """ Reliable implementation of multiprocessing.Queue.empty() """
    return not self.qsize()