Skip to content

Instantly share code, notes, and snippets.

@justinfx
Last active January 11, 2019 04:47
Show Gist options
  • Select an option

  • Save justinfx/b0adb36e694ec03365da19b6bbf33c20 to your computer and use it in GitHub Desktop.

Select an option

Save justinfx/b0adb36e694ec03365da19b6bbf33c20 to your computer and use it in GitHub Desktop.

Revisions

  1. justinfx revised this gist Jan 11, 2019. 1 changed file with 13 additions and 2 deletions.
    15 changes: 13 additions & 2 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,15 @@
    # Usage
    # Build
    ```
    go build worker.go
    go build sender.go
    ```

    # Test

    Start `gnatsd` server and 3 worker processes. Then run test to send 3 messages at once.
    It is expected that all 3 messages are picked up by 3 idle workers, and this may happen on
    the first run. Re-running the send test results in one of the workers not picking up a message
    and another worker handles 2 consecutive messages.

    shell 1
    ```
    @@ -12,7 +23,7 @@ shell 2,3,4

    shell 5
    ```
    python send.py
    python ./send.py
    # output
    started subprocess
  2. justinfx created this gist Jan 11, 2019.
    35 changes: 35 additions & 0 deletions README.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,35 @@
    # Usage

    shell 1
    ```
    gnatsd -p 23456
    ```

    shell 2,3,4
    ```
    ./worker
    ```

    shell 5
    ```
    python send.py
    # output
    started subprocess
    started subprocess
    started subprocess
    [None, None, None]
    [None, None, None]
    [None, None, None]
    [None, None, None]
    [None, None, None]
    2019/01/11 17:36:58 Received: 2019-01-11 17:36:58.929140694 +1300 NZDT m=+222.514945473
    2019/01/11 17:36:58 Received: 2019-01-11 17:36:58.929872619 +1300 NZDT m=+221.086229133
    [0, 0, None]
    [0, 0, None]
    [0, 0, None]
    [0, 0, None]
    [0, 0, None]
    2019/01/11 17:37:03 Received: 2019-01-11 17:37:03.930106351 +1300 NZDT m=+226.086462907
    [0, 0, 0]
    ```
    26 changes: 26 additions & 0 deletions sender.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,26 @@
    package main

    import (
    "log"
    "time"

    "github.com/nats-io/go-nats"
    )

    func main() {
    nc, err := nats.Connect("nats://localhost:23456", nats.Name("runner"))
    if err != nil {
    log.Fatal(err)
    }
    defer nc.Close()

    msg, err := nc.Request("worker", []byte("sleep"), 20*time.Second)
    if err != nil {
    if nc.LastError() != nil {
    log.Fatalf("%v for request", nc.LastError())
    }
    log.Fatalf("%v for request", err)
    }

    log.Printf("Received: %s", msg.Data)
    }
    22 changes: 22 additions & 0 deletions test.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,22 @@
    """
    Test script for sending 3 messages to 3 workers
    """
    import subprocess
    import time

    def start():
    proc = subprocess.Popen(["/tmp/nats_test/sender"])
    print "started subprocess"
    return proc

    procs=[]
    for i in range(3):
    procs.append(start())

    while True:
    result_list=[p.poll() for p in procs]
    print result_list
    running = None in result_list
    if not running:
    break
    time.sleep(1)
    33 changes: 33 additions & 0 deletions worker.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,33 @@
    package main

    import (
    "log"
    "runtime"
    "time"

    "github.com/nats-io/go-nats"
    )

    func main() {
    nc, err := nats.Connect("nats://localhost:23456", nats.Name("worker"))
    if err != nil {
    log.Fatal(err)
    }

    nc.QueueSubscribe("worker", "queue", func(msg *nats.Msg) {
    log.Printf("received message. sleeping")
    time.Sleep(5 * time.Second)
    log.Printf("done sleeping")

    if err := nc.Publish(msg.Reply, []byte(time.Now().String())); err != nil {
    log.Fatal(err)
    }
    })
    nc.Flush()

    if err := nc.LastError(); err != nil {
    log.Fatal(err)
    }

    runtime.Goexit()
    }