Skip to content

Instantly share code, notes, and snippets.

@blinkinglight
Created June 14, 2025 09:45
Show Gist options
  • Save blinkinglight/e7984e75b2961b6bbdd6af73de34bf69 to your computer and use it in GitHub Desktop.
Save blinkinglight/e7984e75b2961b6bbdd6af73de34bf69 to your computer and use it in GitHub Desktop.
package replayhandler
import (
"context"
"fmt"
"log"
"time"
"github.com/ituoga/iot/gen"
"github.com/nats-io/nats.go"
"google.golang.org/protobuf/proto"
)
// interface
type ReplayHandler interface {
ApplyEvent(m *gen.EventEnvelope) error
ApplyCommand(ctx context.Context, m *gen.CommandEnvelope) ([]*gen.EventEnvelope, error)
}
func Replay(ctx context.Context, js nats.JetStreamContext, aggregate, id string, fn ReplayHandler) {
lctx, cancel := context.WithCancel(ctx)
msgs := make(chan *nats.Msg, 128)
sub, _ := js.ChanSubscribe(fmt.Sprintf("events.%s.%s.>", aggregate, id), msgs, nats.DeliverAll(), nats.ManualAck())
defer close(msgs)
defer sub.Unsubscribe()
delay := 1000 * time.Millisecond
go func() {
waiter := time.NewTimer(delay)
for {
select {
case <-ctx.Done():
cancel()
return
case <-waiter.C:
cancel()
return
case msg := <-msgs:
waiter.Reset(200 * time.Second)
if msg == nil {
continue
}
var event = &gen.EventEnvelope{}
err := proto.Unmarshal(msg.Data, event)
if err != nil {
log.Printf("Replay handler error: %v", err)
}
fn.ApplyEvent(event)
msg.Ack()
}
}
}()
<-lctx.Done()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment