package replayhandler import ( "context" "fmt" "log" "time" "github.com/ituoga/iot/gen" "github.com/nats-io/nats.go" "google.golang.org/protobuf/proto" ) // interface type ReplayHandler interface { ApplyEvent(m *gen.EventEnvelope) error ApplyCommand(ctx context.Context, m *gen.CommandEnvelope) ([]*gen.EventEnvelope, error) } func Replay(ctx context.Context, js nats.JetStreamContext, aggregate, id string, fn ReplayHandler) { lctx, cancel := context.WithCancel(ctx) msgs := make(chan *nats.Msg, 128) sub, _ := js.ChanSubscribe(fmt.Sprintf("events.%s.%s.>", aggregate, id), msgs, nats.DeliverAll(), nats.ManualAck()) defer close(msgs) defer sub.Unsubscribe() delay := 1000 * time.Millisecond go func() { waiter := time.NewTimer(delay) for { select { case <-ctx.Done(): cancel() return case <-waiter.C: cancel() return case msg := <-msgs: waiter.Reset(200 * time.Second) if msg == nil { continue } var event = &gen.EventEnvelope{} err := proto.Unmarshal(msg.Data, event) if err != nil { log.Printf("Replay handler error: %v", err) } fn.ApplyEvent(event) msg.Ack() } } }() <-lctx.Done() }