Skip to content

Instantly share code, notes, and snippets.

@thomaspoignant
Created February 19, 2025 12:41
Show Gist options
  • Select an option

  • Save thomaspoignant/07d49ea30eda61ddf5b848d884586ef5 to your computer and use it in GitHub Desktop.

Select an option

Save thomaspoignant/07d49ea30eda61ddf5b848d884586ef5 to your computer and use it in GitHub Desktop.

Revisions

  1. thomaspoignant created this gist Feb 19, 2025.
    115 changes: 115 additions & 0 deletions event.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,115 @@
    package datastruct

    import (
    "fmt"
    "sync"
    )

    const minOffset = int64(-1)

    type eventStoreImpl[T any] struct {
    Events []event[T]
    Mutex sync.Mutex
    Consumers map[string]consumer
    }

    func NewEventStore[T any](consumerNames []string) EventStore[T] {
    consumers := make(map[string]consumer)
    for _, name := range consumerNames {
    consumers[name] = consumer{Offset: 0}
    }
    return &eventStoreImpl[T]{
    Events: make([]event[T], 0),
    Consumers: consumers,
    Mutex: sync.Mutex{},
    }
    }

    type EventStore[T any] interface {
    Push(data T)
    GetEvents(consumerName string) []event[T]
    GetConsumer(name string) (consumer, error)
    }

    type event[T any] struct {
    Offset int64
    Data T
    }

    type consumer struct {
    Offset int64
    }

    func (e *eventStoreImpl[T]) GetConsumer(name string) (consumer, error) {
    if e.Consumers == nil {
    return consumer{}, fmt.Errorf("consumer not found")
    }
    if _, ok := e.Consumers[name]; !ok {
    return consumer{}, fmt.Errorf("consumer not found")
    }
    return e.Consumers[name], nil
    }

    func (e *eventStoreImpl[T]) Push(data T) {
    e.Mutex.Lock()
    defer e.Mutex.Unlock()
    currentOffset := minOffset
    if e.Events != nil && len(e.Events) > 0 {
    currentOffset = e.Events[len(e.Events)-1].Offset
    }
    e.Events = append(e.Events, event[T]{Offset: currentOffset + 1, Data: data})
    }

    func (e *eventStoreImpl[T]) GetEvents(consumerName string) []event[T] {
    e.Mutex.Lock()
    defer e.Mutex.Unlock()
    events := make([]event[T], 0)

    currentConsumer, err := e.GetConsumer(consumerName)
    if err != nil {
    // TODO: handle error
    return events
    }
    for _, event := range e.Events {
    if event.Offset > currentConsumer.Offset {
    events = append(events, event)
    }
    }
    if len(events) > 0 {
    e.Consumers[consumerName] = consumer{Offset: events[len(events)-1].Offset}
    }
    if err := e.RemoveOldEvents(); err != nil {
    // TODO: log something here
    }
    return events
    }

    func (e *eventStoreImpl[T]) RemoveOldEvents() error {
    if e.Events == nil || len(e.Events) == 0 {
    return nil
    }
    if e.Consumers == nil || len(e.Consumers) == 0 {
    return fmt.Errorf("no consumers configured")
    }
    consumerMinOffset := minOffset
    for _, consumer := range e.Consumers {
    if consumerMinOffset == minOffset || consumer.Offset < consumerMinOffset {
    consumerMinOffset = consumer.Offset
    }
    }
    if consumerMinOffset == int64(-1) {
    //TODO: no events to remove
    return nil
    }

    index := 0
    for i, event := range e.Events {
    if event.Offset == consumerMinOffset {
    index = i
    break
    }
    }

    e.Events = e.Events[index:]
    return nil
    }