Forked from tomschr/asyncio-producer-consumer-task_done.py
Created
February 26, 2018 09:32
-
-
Save yoyonel/683aa2358647cf27436e23466c3e50e3 to your computer and use it in GitHub Desktop.
Revisions
-
tomschr revised this gist
May 28, 2017 . 1 changed file with 72 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,72 @@ # From: https://pymotw.com/2/multiprocessing/communication.html import multiprocessing import time class Consumer(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: next_task = self.task_queue.get() if next_task is None: # Poison pill means shutdown print('%s: Exiting' % proc_name) self.task_queue.task_done() break print('%s: %s' % (proc_name, next_task)) answer = next_task() self.task_queue.task_done() self.result_queue.put(answer) return class Task(object): def __init__(self, a, b): self.a = a self.b = b def __call__(self): time.sleep(0.1) # pretend to take some time to do the work return '%s * %s = %s' % (self.a, self.b, self.a * self.b) def __str__(self): return '%s * %s' % (self.a, self.b) if __name__ == '__main__': # Establish communication queues tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() # Start consumers num_consumers = multiprocessing.cpu_count() * 2 print('Creating %d consumers' % num_consumers) consumers = [Consumer(tasks, results) for i in range(num_consumers)] start = time.time() for w in consumers: w.start() # Enqueue jobs num_jobs = 10 for i in range(num_jobs): tasks.put(Task(i, i)) # Add a poison pill for each consumer for i in range(num_consumers): tasks.put(None) # Wait for all of the tasks to finish tasks.join() end = time.time() # Start printing results while num_jobs: result = results.get() print('Result:', result) num_jobs -= 1 print("Running for %.3fs" % (end - start)) -
tomschr revised this gist
May 28, 2017 . 1 changed file with 51 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,51 @@ # Original source from http://asyncio.readthedocs.io/en/latest/producer_consumer.html # Rewritten for Python >=3.4 import asyncio import random @asyncio.coroutine def produce(queue, n): for x in range(n): # produce an item print('producing {}/{}'.format(x, n)) # simulate i/o operation using sleep yield from asyncio.sleep(random.random()) item = str(x) # put the item in the queue yield from queue.put(item) @asyncio.coroutine def consume(queue): while True: # wait for an item from the producer item = yield from queue.get() # process the item print('consuming {}...'.format(item)) # simulate i/o operation using sleep yield from asyncio.sleep(random.random()) # Notify the queue that the item has been processed queue.task_done() @asyncio.coroutine def run(n): queue = asyncio.Queue() # schedule the consumer consumer = asyncio.ensure_future(consume(queue)) # run the producer and wait for completion yield from produce(queue, n) # wait until the consumer has processed all items yield from queue.join() # the consumer is still awaiting for an item, cancel it consumer.cancel() if __name__ == "__main__": loop = asyncio.get_event_loop() loop.run_until_complete(run(10)) loop.close() -
tomschr created this gist
May 28, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,44 @@ # Original source from http://asyncio.readthedocs.io/en/latest/producer_consumer.html # Rewritten for Python >=3.4 import asyncio import random @asyncio.coroutine def produce(queue, n): for x in range(1, n + 1): # produce an item print('producing {}/{}'.format(x, n)) # simulate i/o operation using sleep yield from asyncio.sleep(random.random()) item = str(x) # put the item in the queue yield from queue.put(item) # indicate the producer is done yield from queue.put(None) @asyncio.coroutine def consume(queue): while True: # wait for an item from the producer item = yield from queue.get() if item is None: # the producer emits None to indicate that it is done break # process the item print('consuming item {}...'.format(item)) # simulate i/o operation using sleep yield from asyncio.sleep(random.random()) if __name__ == "__main__": loop = asyncio.get_event_loop() queue = asyncio.Queue(loop=loop) producer_coro = produce(queue, 10) consumer_coro = consume(queue) loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro)) loop.close()