Skip to content

Instantly share code, notes, and snippets.

@kylemcdonald
Created December 11, 2023 15:43
Show Gist options
  • Select an option

  • Save kylemcdonald/b8f2cf00d02730df7ae86bdaf69a8e8b to your computer and use it in GitHub Desktop.

Select an option

Save kylemcdonald/b8f2cf00d02730df7ae86bdaf69a8e8b to your computer and use it in GitHub Desktop.

Revisions

  1. kylemcdonald created this gist Dec 11, 2023.
    10 changes: 10 additions & 0 deletions collector.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,10 @@
    import zmq

    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.bind("tcp://*:5558")

    print("Collector started... collecting results.")
    while True:
    result = socket.recv_json()
    print(f"Collected: {result}")
    14 changes: 14 additions & 0 deletions producer.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,14 @@
    import zmq
    import time
    import random

    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:5557")

    print("Producer started... producing jobs.")
    while True:
    job = random.randrange(1, 100) # Randomly generate a simple job
    socket.send_json({"job_id": job})
    print(f"Sent job #{job}")
    time.sleep(0.1) # Simulate time between sending jobs
    25 changes: 25 additions & 0 deletions worker.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,25 @@
    import zmq
    import time
    import random

    worker_id = random.randrange(1, 1000)

    context = zmq.Context()

    # Connect to the producer's job dispatch
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")

    # Connect to the collector to send results
    sender = context.socket(zmq.PUSH)
    sender.connect("tcp://localhost:5558")

    print(f"Worker #{worker_id} started... pulling jobs.")
    while True:
    job = receiver.recv_json() # Receive job from producer
    job_id = job["job_id"]
    print(f"Processing job #{job_id}")
    time.sleep(0.2) # Simulate some work being done

    # Once complete, send results to the collector
    sender.send_json({"job_id": job_id, "worker": worker_id})