Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save yoyonel/683aa2358647cf27436e23466c3e50e3 to your computer and use it in GitHub Desktop.

Select an option

Save yoyonel/683aa2358647cf27436e23466c3e50e3 to your computer and use it in GitHub Desktop.

Revisions

  1. @tomschr tomschr revised this gist May 28, 2017. 1 changed file with 72 additions and 0 deletions.
    72 changes: 72 additions & 0 deletions mp-producer-consumer-1.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,72 @@
    # From: https://pymotw.com/2/multiprocessing/communication.html

    import multiprocessing
    import time


    class Consumer(multiprocessing.Process):
    def __init__(self, task_queue, result_queue):
    multiprocessing.Process.__init__(self)
    self.task_queue = task_queue
    self.result_queue = result_queue

    def run(self):
    proc_name = self.name
    while True:
    next_task = self.task_queue.get()
    if next_task is None:
    # Poison pill means shutdown
    print('%s: Exiting' % proc_name)
    self.task_queue.task_done()
    break
    print('%s: %s' % (proc_name, next_task))
    answer = next_task()
    self.task_queue.task_done()
    self.result_queue.put(answer)
    return


    class Task(object):
    def __init__(self, a, b):
    self.a = a
    self.b = b
    def __call__(self):
    time.sleep(0.1) # pretend to take some time to do the work
    return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
    return '%s * %s' % (self.a, self.b)


    if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating %d consumers' % num_consumers)
    consumers = [Consumer(tasks, results) for i in range(num_consumers)]
    start = time.time()
    for w in consumers:
    w.start()

    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
    tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in range(num_consumers):
    tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()
    end = time.time()

    # Start printing results
    while num_jobs:
    result = results.get()
    print('Result:', result)
    num_jobs -= 1

    print("Running for %.3fs" % (end - start))
  2. @tomschr tomschr revised this gist May 28, 2017. 1 changed file with 51 additions and 0 deletions.
    51 changes: 51 additions & 0 deletions asyncio-producer-consumer-task_done.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,51 @@
    # Original source from http://asyncio.readthedocs.io/en/latest/producer_consumer.html
    # Rewritten for Python >=3.4

    import asyncio
    import random


    @asyncio.coroutine
    def produce(queue, n):
    for x in range(n):
    # produce an item
    print('producing {}/{}'.format(x, n))
    # simulate i/o operation using sleep
    yield from asyncio.sleep(random.random())
    item = str(x)
    # put the item in the queue
    yield from queue.put(item)


    @asyncio.coroutine
    def consume(queue):
    while True:
    # wait for an item from the producer
    item = yield from queue.get()

    # process the item
    print('consuming {}...'.format(item))
    # simulate i/o operation using sleep
    yield from asyncio.sleep(random.random())

    # Notify the queue that the item has been processed
    queue.task_done()


    @asyncio.coroutine
    def run(n):
    queue = asyncio.Queue()
    # schedule the consumer
    consumer = asyncio.ensure_future(consume(queue))
    # run the producer and wait for completion
    yield from produce(queue, n)
    # wait until the consumer has processed all items
    yield from queue.join()
    # the consumer is still awaiting for an item, cancel it
    consumer.cancel()


    if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(10))
    loop.close()
  3. @tomschr tomschr created this gist May 28, 2017.
    44 changes: 44 additions & 0 deletions asyncio-producer-consumer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,44 @@
    # Original source from http://asyncio.readthedocs.io/en/latest/producer_consumer.html
    # Rewritten for Python >=3.4

    import asyncio
    import random


    @asyncio.coroutine
    def produce(queue, n):
    for x in range(1, n + 1):
    # produce an item
    print('producing {}/{}'.format(x, n))
    # simulate i/o operation using sleep
    yield from asyncio.sleep(random.random())
    item = str(x)
    # put the item in the queue
    yield from queue.put(item)

    # indicate the producer is done
    yield from queue.put(None)


    @asyncio.coroutine
    def consume(queue):
    while True:
    # wait for an item from the producer
    item = yield from queue.get()
    if item is None:
    # the producer emits None to indicate that it is done
    break

    # process the item
    print('consuming item {}...'.format(item))
    # simulate i/o operation using sleep
    yield from asyncio.sleep(random.random())


    if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = asyncio.Queue(loop=loop)
    producer_coro = produce(queue, 10)
    consumer_coro = consume(queue)
    loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
    loop.close()