-
-
Save kevinclcn/cce6d43f736b6827b724823a3eb8599e to your computer and use it in GitHub Desktop.
Code snippets for my blog post "The X-Files: Avoiding Concurrency Boilerplate with golang.org/x/sync"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Debounce wraps e, preventing duplicate NamedActions from running | |
| // concurrently, even from concurrent calls to Execute. | |
| func Debounce(e Executor) Executor { | |
| return debouncer{ | |
| ex: e, | |
| sf: new(singleflight.Group), | |
| } | |
| } | |
| type debouncer struct { | |
| ex Executor | |
| sf *singleflight.Group | |
| } | |
| // For any action | |
| func (d debouncer) Execute(ctx context.Context, actions []Action) error { | |
| wrapped := make([]Action, len(actions)) | |
| for i, a := range actions { | |
| if na, ok := a.(NamedAction); ok { | |
| wrapped[i] = debouncedAction{ | |
| NamedAction: na, | |
| sf: d.sf, | |
| } | |
| } else { | |
| wrapped[i] = actions[i] | |
| } | |
| } | |
| return d.ex.Execute(ctx, wrapped) | |
| } | |
| type debouncedAction struct { | |
| NamedAction | |
| sf *singleflight.Group | |
| } | |
| func (da debouncedAction) Execute(ctx context.Context) error { | |
| fn := func() (interface{}, error) { | |
| return nil, da.NamedAction.Execute(ctx) | |
| } | |
| _, err, _ := da.sf.Do(da.ID(), fn) | |
| return err | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // An Action performs a single arbitrary task. | |
| type Action interface { | |
| // Execute performs the work of an Action. This method should make a best | |
| // effort to be cancelled if the provided ctx is cancelled. | |
| Execute(ctx context.Context) error | |
| } | |
| // An Executor performs a set of Actions. It is up to the implementing type | |
| // the concurrency and open/closed failure behavior of the actions. | |
| type Executor interface { | |
| // Execute performs all provided actions by calling their Execute method. | |
| // This method should make a best-effort to cancel outstanding actions if the | |
| // provided ctx is cancelled. | |
| Execute(ctx context.Context, actions []Action) error | |
| } | |
| // ActionFunc permits using a standalone function as an Action. | |
| type ActionFunc func(context.Context) error | |
| // Execute satisfies the Action interface, delegating the call to the | |
| // underlying function. | |
| func (fn ActionFunc) Execute(ctx context.Context) error { return fn(ctx) } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type flow struct { | |
| maxActions int64 | |
| actions *semaphore.Weighted | |
| calls *semaphore.Weighted | |
| ex Executor | |
| } | |
| // ControlFlow decorates an Executor, limiting it to a maximum concurrent | |
| // number of calls and actions. | |
| func ControlFlow(e Executor, maxCalls, maxActions int64) Executor { | |
| return &flow{ | |
| maxActions: maxActions, | |
| calls: semaphore.NewWeighted(maxCalls), | |
| actions: semaphore.NewWeighted(maxActions), | |
| } | |
| } | |
| // Execute attempts to acquire the semaphores for the concurrent calls and | |
| // actions before delegating to the decorated Executor. If Execute is called | |
| // with more actions than maxActions, an error is returned. | |
| func (f *flow) Execute(ctx context.Context, actions []Action) error { | |
| qty := int64(len(actions)) | |
| if qty > f.maxActions { | |
| return fmt.Errorf("maximum %d actions allowed", f.maxActions) | |
| } | |
| if err := f.calls.Acquire(ctx, 1); err != nil { | |
| return err | |
| } | |
| defer f.calls.Release(1) | |
| if err := f.actions.Acquire(ctx, qty); err != nil { | |
| return err | |
| } | |
| defer f.calls.Release(qty) | |
| return f.ex.Execute(ctx, actions) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type metrics struct { | |
| ex Executor | |
| statCache | |
| } | |
| func (m *metrics) Execute(ctx context.Context, actions []Action) error { | |
| wrapped := make([]Action, len(actions)) | |
| global := m.get("all_actions") | |
| for i, a := range actions { | |
| if na, ok := a.(NamedAction); ok { | |
| wrapped[i] = namedStatAction{ | |
| NamedAction: na, | |
| global: global, | |
| stats: m.get(na.ID()), | |
| } | |
| } else { | |
| wrapped[i] = statAction{ | |
| Action: a, | |
| global: global, | |
| } | |
| } | |
| } | |
| return m.ex.Execute(ctx, wrapped) | |
| } | |
| type namedStatAction struct { | |
| NamedAction | |
| global *statSet | |
| stats *statSet | |
| } | |
| func (a namedStatAction) Execute(ctx context.Context) error { | |
| return captureMetrics(ctx, a.NamedAction, a.global, a.stats) | |
| } | |
| type statAction struct { | |
| Action | |
| global *statSet | |
| } | |
| func (a statAction) Execute(ctx context.Context) error { | |
| return captureMetrics(ctx, a.Action, a.global, nil) | |
| } | |
| func captureMetrics(ctx context.Context, a Action, global, stats *statSet) error { | |
| // execute the action, timing its latency | |
| start := time.Now() | |
| err := a.Execute(ctx) | |
| lat := time.Now().Sub(start) | |
| // create our counter values for error/success | |
| var errored, succeeded int | |
| if err != nil { | |
| errored = 1 | |
| } else { | |
| succeeded = 1 | |
| } | |
| // emit the global stats | |
| global.Latency(lat) | |
| global.Success(succeeded) | |
| global.Error(errored) | |
| // if there are name-scoped stats, emit those, too | |
| if stats != nil { | |
| stats.Latency(lat) | |
| stats.Success(succeeded) | |
| stats.Error(errored) | |
| } | |
| return err | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // A NamedAction describes an Action that also has a unique identifier. This | |
| // interface is used by the Debounce Executor to prevent duplicate actions from | |
| // running concurrently. | |
| type NamedAction interface { | |
| Action | |
| // ID returns the name for this Action. Identical actions | |
| // should return the same ID value. | |
| ID() string | |
| } | |
| type namedAction struct { | |
| ActionFunc | |
| name string | |
| } | |
| func (a namedAction) ID() string { return a.name } | |
| // Named creates a NamedAction from fn, with n as its name. This function is | |
| // just a helper to simplify creating NamedActions. | |
| func Named(n string, fn ActionFunc) NamedAction { | |
| return namedAction{ | |
| ActionFunc: fn, | |
| name: n, | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Parallel is a concurrent implementation of Executor | |
| type Parallel struct{} | |
| // Execute performs all provided actions in concurrently, failing closed on the | |
| // first error or if ctx is cancelled. | |
| func (p Parallel) Execute(ctx context.Context, actions []Action) error { | |
| grp, ctx := errgroup.WithContext(ctx) | |
| for _, a := range actions { | |
| grp.Go(p.execFn(ctx, a)) | |
| } | |
| return grp.Wait() | |
| } | |
| // execFn binds the Context and Action to the proper function signature for the | |
| // errgroup.Group. | |
| func (p Parallel) execFn(ctx context.Context, a Action) func() error { | |
| return func() error { return a.Execute(ctx) } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // StatSource creates metrics with the given name. The returned metrics should be | |
| // concurrency-safe. | |
| type StatSource interface { | |
| Timer(name string) Timer | |
| Counter(name string) Counter | |
| } | |
| // Timer emits the duration of a particular event. The duration value is | |
| // typically used to measure latencies and create histograms thereof. | |
| type Timer func(duration time.Duration) | |
| // Counter emits any number of events happening at a given time. For example, | |
| // Counters are often used to measure RPS. | |
| type Counter func(delta int) | |
| // A StatSet is the cached value. | |
| type statSet struct { | |
| // Latency measures how long an Action takes | |
| Latency Timer | |
| // Success is incremented when an Action does not return an error | |
| Success Counter | |
| // Error is incremented when an Action results in an error | |
| Error Counter | |
| } | |
| // Cache describes a read-through cache to obtain | |
| type statCache interface { | |
| // get returns a shared statSet for the given name, either from the cache or | |
| // a provided StatSource. | |
| get(name string) *statSet | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // mutexCache implements statCache, backed by a map and sync.RWMutex | |
| type mutexCache struct { | |
| src StatSource | |
| mtx sync.RWMutex | |
| lookup map[string]*statSet | |
| } | |
| func newMutexCache(src StatSource) *mutexCache { | |
| return &mutexCache{ | |
| src: src, | |
| lookup: make(map[string]*statSet), | |
| } | |
| } | |
| func (mc *mutexCache) get(name string) *statSet { | |
| // take a read lock to see if the set already exists | |
| mc.mtx.RLock() | |
| set, ok := mc.lookup[name] | |
| mc.mtx.RUnlock() | |
| if ok { // the set exists, return it | |
| return set | |
| } | |
| // need to take a write lock to update the map | |
| mc.mtx.Lock() | |
| // While waiting for the write lock, another goroutine may have created the | |
| // set. Here, we check again after obtaining the lock before making a new one | |
| if set, ok = mc.lookup[name]; !ok { | |
| set = newStatSet(mc.src, name) | |
| mc.lookup[name] = set | |
| } | |
| mc.mtx.Unlock() | |
| return set | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // syncMapCache implements statCache, backed by a sync.Map | |
| type syncMapCache struct { | |
| src StatSource | |
| lookup sync.Map | |
| } | |
| func newSyncMapCache(src StatSource) *syncMapCache { | |
| return &syncMapCache{src: src} | |
| } | |
| func (smc *syncMapCache) get(name string) *statSet { | |
| val, _ := smc.lookup.Load(name) | |
| if set, ok := val.(*statSet); ok { | |
| return set | |
| } | |
| // create a new statSet, but don't store it if one was added since the last | |
| // load. This is not ideal since we can't atomically create the set and | |
| // write it. | |
| set, _ := smc.lookup.LoadOrStore(name, newStatSet(smc.src, name)) | |
| return set.(*statSet) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment