Skip to content

Instantly share code, notes, and snippets.

@blinkinglight
Created June 14, 2025 09:45
Show Gist options
  • Save blinkinglight/e7984e75b2961b6bbdd6af73de34bf69 to your computer and use it in GitHub Desktop.
Save blinkinglight/e7984e75b2961b6bbdd6af73de34bf69 to your computer and use it in GitHub Desktop.

Revisions

  1. blinkinglight created this gist Jun 14, 2025.
    54 changes: 54 additions & 0 deletions package.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,54 @@
    package replayhandler

    import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/ituoga/iot/gen"
    "github.com/nats-io/nats.go"
    "google.golang.org/protobuf/proto"
    )

    // interface
    type ReplayHandler interface {
    ApplyEvent(m *gen.EventEnvelope) error
    ApplyCommand(ctx context.Context, m *gen.CommandEnvelope) ([]*gen.EventEnvelope, error)
    }

    func Replay(ctx context.Context, js nats.JetStreamContext, aggregate, id string, fn ReplayHandler) {
    lctx, cancel := context.WithCancel(ctx)
    msgs := make(chan *nats.Msg, 128)
    sub, _ := js.ChanSubscribe(fmt.Sprintf("events.%s.%s.>", aggregate, id), msgs, nats.DeliverAll(), nats.ManualAck())
    defer close(msgs)
    defer sub.Unsubscribe()
    delay := 1000 * time.Millisecond

    go func() {
    waiter := time.NewTimer(delay)
    for {
    select {
    case <-ctx.Done():
    cancel()
    return
    case <-waiter.C:
    cancel()
    return
    case msg := <-msgs:
    waiter.Reset(200 * time.Second)
    if msg == nil {
    continue
    }
    var event = &gen.EventEnvelope{}
    err := proto.Unmarshal(msg.Data, event)
    if err != nil {
    log.Printf("Replay handler error: %v", err)
    }
    fn.ApplyEvent(event)
    msg.Ack()
    }
    }
    }()
    <-lctx.Done()
    }