Last active
          January 20, 2020 21:55 
        
      - 
      
- 
        Save leon0707/256b04d33ca42580e4a54dda7bef873f to your computer and use it in GitHub Desktop. 
Revisions
- 
        leon0707 revised this gist Jan 20, 2020 . 1 changed file with 18 additions and 3 deletions.There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,3 +1,10 @@ ''' qsize raises NotImplementedError on Unix platforms like Mac OS X where sem_getvalue() is not implemented. https://docs.python.org/3.6/library/multiprocessing.html#multiprocessing.Queue.qsize ''' import multiprocessing.queues from queue import Empty, Full class SharedCounter(object): """ A synchronized shared counter. @@ -13,10 +20,10 @@ class SharedCounter(object): """ def __init__(self, n=0): self.count = multiprocessing.Value('i', n) def increment(self, n=1): """ Increment the counter by n (default = 1) """ with self.count.get_lock(): self.count.value += n @@ -42,14 +49,22 @@ class Queue(multiprocessing.queues.Queue): """ def __init__(self, *args, **kwargs): super(Queue, self).__init__(ctx=multiprocessing.get_context(), *args, **kwargs) self.size = SharedCounter(0) if 'maxsize' in kwargs: self.maxsize = kwargs['kwargs'] else: self.maxsize = None def put(self, *args, **kwargs): if self.maxsize != None and self.qsize() == self.maxsize: raise Full self.size.increment(1) super(Queue, self).put(*args, **kwargs) def get(self, *args, **kwargs): if self.qsize() == 0: raise Empty self.size.increment(-1) return super(Queue, self).get(*args, **kwargs) 
- 
        leon0707 created this gist Jan 20, 2020 .There are no files selected for viewingThis file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,62 @@ class SharedCounter(object): """ A synchronized shared counter. The locking done by multiprocessing.Value ensures that only a single process or thread may read or write the in-memory ctypes object. However, in order to do n += 1, Python performs a read followed by a write, so a second process may read the old value before the new one is written by the first process. The solution is to use a multiprocessing.Lock to guarantee the atomicity of the modifications to Value. This class comes almost entirely from Eli Bendersky's blog: http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/ """ def __init__(self, n = 0): self.count = multiprocessing.Value('i', n) def increment(self, n = 1): """ Increment the counter by n (default = 1) """ with self.count.get_lock(): self.count.value += n @property def value(self): """ Return the value of the counter """ return self.count.value class Queue(multiprocessing.queues.Queue): """ A portable implementation of multiprocessing.Queue. Because of multithreading / multiprocessing semantics, Queue.qsize() may raise the NotImplementedError exception on Unix platforms like Mac OS X where sem_getvalue() is not implemented. This subclass addresses this problem by using a synchronized shared counter (initialized to zero) and increasing / decreasing its value every time the put() and get() methods are called, respectively. This not only prevents NotImplementedError from being raised, but also allows us to implement a reliable version of both qsize() and empty(). """ def __init__(self, *args, **kwargs): super(Queue, self).__init__(*args, **kwargs) self.size = SharedCounter(0) def put(self, *args, **kwargs): self.size.increment(1) super(Queue, self).put(*args, **kwargs) def get(self, *args, **kwargs): self.size.increment(-1) return super(Queue, self).get(*args, **kwargs) def qsize(self): """ Reliable implementation of multiprocessing.Queue.qsize() """ return self.size.value def empty(self): """ Reliable implementation of multiprocessing.Queue.empty() """ return not self.qsize()