-
-
Save kevinclcn/cce6d43f736b6827b724823a3eb8599e to your computer and use it in GitHub Desktop.
Code snippets for my blog post "The X-Files: Avoiding Concurrency Boilerplate with golang.org/x/sync"
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Debounce wraps e, preventing duplicate NamedActions from running | |
| // concurrently, even from concurrent calls to Execute. | |
| func Debounce(e Executor) Executor { | |
| return debouncer{ | |
| ex: e, | |
| sf: new(singleflight.Group), | |
| } | |
| } | |
| type debouncer struct { | |
| ex Executor | |
| sf *singleflight.Group | |
| } | |
| // For any action | |
| func (d debouncer) Execute(ctx context.Context, actions []Action) error { | |
| wrapped := make([]Action, len(actions)) | |
| for i, a := range actions { | |
| if na, ok := a.(NamedAction); ok { | |
| wrapped[i] = debouncedAction{ | |
| NamedAction: na, | |
| sf: d.sf, | |
| } | |
| } else { | |
| wrapped[i] = actions[i] | |
| } | |
| } | |
| return d.ex.Execute(ctx, wrapped) | |
| } | |
| type debouncedAction struct { | |
| NamedAction | |
| sf *singleflight.Group | |
| } | |
| func (da debouncedAction) Execute(ctx context.Context) error { | |
| fn := func() (interface{}, error) { | |
| return nil, da.NamedAction.Execute(ctx) | |
| } | |
| _, err, _ := da.sf.Do(da.ID(), fn) | |
| return err | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // An Action performs a single arbitrary task. | |
| type Action interface { | |
| // Execute performs the work of an Action. This method should make a best | |
| // effort to be cancelled if the provided ctx is cancelled. | |
| Execute(ctx context.Context) error | |
| } | |
| // An Executor performs a set of Actions. It is up to the implementing type | |
| // the concurrency and open/closed failure behavior of the actions. | |
| type Executor interface { | |
| // Execute performs all provided actions by calling their Execute method. | |
| // This method should make a best-effort to cancel outstanding actions if the | |
| // provided ctx is cancelled. | |
| Execute(ctx context.Context, actions []Action) error | |
| } | |
| // ActionFunc permits using a standalone function as an Action. | |
| type ActionFunc func(context.Context) error | |
| // Execute satisfies the Action interface, delegating the call to the | |
| // underlying function. | |
| func (fn ActionFunc) Execute(ctx context.Context) error { return fn(ctx) } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type flow struct { | |
| maxActions int64 | |
| actions *semaphore.Weighted | |
| calls *semaphore.Weighted | |
| ex Executor | |
| } | |
| // ControlFlow decorates an Executor, limiting it to a maximum concurrent | |
| // number of calls and actions. | |
| func ControlFlow(e Executor, maxCalls, maxActions int64) Executor { | |
| return &flow{ | |
| maxActions: maxActions, | |
| calls: semaphore.NewWeighted(maxCalls), | |
| actions: semaphore.NewWeighted(maxActions), | |
| } | |
| } | |
| // Execute attempts to acquire the semaphores for the concurrent calls and | |
| // actions before delegating to the decorated Executor. If Execute is called | |
| // with more actions than maxActions, an error is returned. | |
| func (f *flow) Execute(ctx context.Context, actions []Action) error { | |
| qty := int64(len(actions)) | |
| if qty > f.maxActions { | |
| return fmt.Errorf("maximum %d actions allowed", f.maxActions) | |
| } | |
| if err := f.calls.Acquire(ctx, 1); err != nil { | |
| return err | |
| } | |
| defer f.calls.Release(1) | |
| if err := f.actions.Acquire(ctx, qty); err != nil { | |
| return err | |
| } | |
| defer f.calls.Release(qty) | |
| return f.ex.Execute(ctx, actions) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // A NamedAction describes an Action that also has a unique identifier. This | |
| // interface is used by the Debounce Executor to prevent duplicate actions from | |
| // running concurrently. | |
| type NamedAction interface { | |
| Action | |
| // ID returns the name for this Action. Identical actions | |
| // should return the same ID value. | |
| ID() string | |
| } | |
| type namedAction struct { | |
| ActionFunc | |
| name string | |
| } | |
| func (a namedAction) ID() string { return a.name } | |
| // Named creates a NamedAction from fn, with n as its name. This function is | |
| // just a helper to simplify creating NamedActions. | |
| func Named(n string, fn ActionFunc) NamedAction { | |
| return namedAction{ | |
| ActionFunc: fn, | |
| name: n, | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // Parallel is a concurrent implementation of Executor | |
| type Parallel struct{} | |
| // Execute performs all provided actions in concurrently, failing closed on the | |
| // first error or if ctx is cancelled. | |
| func (p Parallel) Execute(ctx context.Context, actions []Action) error { | |
| grp, ctx := errgroup.WithContext(ctx) | |
| for _, a := range actions { | |
| grp.Go(p.execFn(ctx, a)) | |
| } | |
| return grp.Wait() | |
| } | |
| // execFn binds the Context and Action to the proper function signature for the | |
| // errgroup.Group. | |
| func (p Parallel) execFn(ctx context.Context, a Action) func() error { | |
| return func() error { return a.Execute(ctx) } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment