package main import ( "context" "fmt" "github.com/choria-io/fisk" "github.com/nats-io/jsm.go/natscontext" "github.com/nats-io/nats.go" "os" "os/signal" "strconv" "strings" "syscall" "time" ) var ( subject string nctx string interval time.Duration ctx context.Context cancel context.CancelFunc ) func main() { app := fisk.New("ct", "NATS connection latency test") app.Flag("context", "NATS Context to connect with").StringVar(&nctx) publisher := app.Command("publish", "Start a publisher").Alias("pub").Action(runPublisher) publisher.Arg("subject", "Subject to test on").Required().StringVar(&subject) publisher.Flag("interval", "Publish interval").Short('i').Default("1s").DurationVar(&interval) subscriber := app.Commandf("subscribe", "Start a subscriber").Alias("sub").Action(runSubscriber) subscriber.Arg("subject", "Subject to test on").Required().StringVar(&subject) ctx, cancel = context.WithCancel(context.Background()) go interruptWatcher() app.MustParseWithUsage(os.Args[1:]) } func connectNats() (*nats.Conn, error) { return natscontext.Connect(nctx) } func runPublisher(_ *fisk.ParseContext) error { nc, err := connectNats() if err != nil { return err } var ( errs int corruptions int symmetryHist []float64 rttHist []time.Duration platHist []time.Duration ) ticker := time.NewTicker(interval) for { select { case <-ticker.C: start := time.Now() to, cancel := context.WithTimeout(ctx, 2*time.Second) res, err := nc.RequestWithContext(to, subject, []byte(fmt.Sprintf("%d", start.UnixNano()))) cancel() if err != nil { fmt.Printf("Error publishing: %v\n", err) errs++ continue } parts := strings.Split(string(res.Data), " ") if len(parts) != 2 { fmt.Printf("Invalid response %q\n", res.Data) corruptions++ continue } ns, err := strconv.Atoi(parts[1]) if err != nil { fmt.Printf("Invalid response: %q: %v", res.Data, err) corruptions++ continue } since := time.Since(start) pubRtt := time.Duration(ns) symmetry := float64(since) / float64(pubRtt) if len(rttHist) == 51 { rttHist = rttHist[1:] } if len(platHist) == 51 { platHist = platHist[1:] } if len(symmetryHist) == 51 { symmetryHist = symmetryHist[1:] } rttHist = append(rttHist, since) platHist = append(platHist, pubRtt) symmetryHist = append(symmetryHist, symmetry) fmt.Printf("%v roundtrip time: %v (%v) publish latency: %v (%v) rtt symmetry: %0.3f (%0.3f)\n", start.Format(time.RFC3339), since.Round(time.Microsecond), avgDuration(rttHist).Round(time.Microsecond), pubRtt.Round(time.Microsecond), avgDuration(platHist).Round(time.Microsecond), symmetry, avgFloats(symmetryHist)) case <-ctx.Done(): return nil } } } func avgFloats(d []float64) float64 { sum := float64(0) for _, i := range d { sum += i } return sum / float64(len(d)) } func avgDuration(d []time.Duration) time.Duration { sum := time.Duration(0) for _, i := range d { sum += i } return time.Duration(int(sum) / len(d)) } func runSubscriber(_ *fisk.ParseContext) error { nc, err := connectNats() if err != nil { return err } _, err = nc.Subscribe(subject, func(msg *nats.Msg) { ns, err := strconv.Atoi(string(msg.Data)) if err != nil { fmt.Printf("Invalid request: %q: %v", msg.Data, err) return } plat := time.Since(time.Unix(0, int64(ns))) fmt.Printf("%v Received a message with publish latency: %v\n", time.Now().Format(time.RFC3339), plat.Round(time.Microsecond)) err = msg.Respond([]byte(fmt.Sprintf("%d %d", time.Now().UnixNano(), plat))) if err != nil { fmt.Printf("Reply failed: %v", err) return } }) if err != nil { return err } <-ctx.Done() return nil } func interruptWatcher() { sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) for { select { case sig := <-sigs: switch sig { case syscall.SIGINT, syscall.SIGTERM: cancel() } case <-ctx.Done(): return } } }