-
-
Save kevinclcn/cce6d43f736b6827b724823a3eb8599e to your computer and use it in GitHub Desktop.
Revisions
-
rodaine revised this gist
Aug 17, 2018 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -12,6 +12,7 @@ func ControlFlow(e Executor, maxCalls, maxActions int64) Executor { maxActions: maxActions, calls: semaphore.NewWeighted(maxCalls), actions: semaphore.NewWeighted(maxActions), ex: e, } } -
rodaine revised this gist
Aug 16, 2018 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -35,7 +35,7 @@ func (f *flow) Execute(ctx context.Context, actions []Action) error { if err := f.actions.Acquire(ctx, qty); err != nil { return err } defer f.actions.Release(qty) // delegate Actions to decorated Executor return f.ex.Execute(ctx, actions) -
rodaine revised this gist
Aug 15, 2018 . 5 changed files with 20 additions and 16 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,5 +1,5 @@ // Debounce wraps e, preventing duplicate NamedActions from running // concurrently, even from separate calls to Execute. func Debounce(e Executor) Executor { return debouncer{ ex: e, @@ -12,21 +12,25 @@ type debouncer struct { sf *singleflight.Group } // Execute attaches a singleflight.Group to any NamedActions, effectively debouncing // identical Actions if ran concurrently. func (d debouncer) Execute(ctx context.Context, actions []Action) error { wrapped := make([]Action, len(actions)) for i, a := range actions { if na, ok := a.(NamedAction); ok { // compose the NamedAction with the singleflight.Group wrapped[i] = debouncedAction{ NamedAction: na, sf: d.sf, } } else { // otherwise, pass it through untouched wrapped[i] = actions[i] } } // delegate wrapped Actions to decorated Executor return d.ex.Execute(ctx, wrapped) } @@ -36,6 +40,8 @@ type debouncedAction struct { } func (da debouncedAction) Execute(ctx context.Context) error { // map the composed Action's Execute function with the expected signature // for singleflight.Group.Do. fn := func() (interface{}, error) { return nil, da.NamedAction.Execute(ctx) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -25,15 +25,18 @@ func (f *flow) Execute(ctx context.Context, actions []Action) error { return fmt.Errorf("maximum %d actions allowed", f.maxActions) } // limit concurrent calls to Executor.Execute if err := f.calls.Acquire(ctx, 1); err != nil { return err } defer f.calls.Release(1) // limit total in-flight Actions, independent of Execute calls if err := f.actions.Acquire(ctx, qty); err != nil { return err } defer f.calls.Release(qty) // delegate Actions to decorated Executor return f.ex.Execute(ctx, actions) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,27 +1,32 @@ type metrics struct { ex Executor stats statCache } // Execute emits latency, success, and error metrics for every action delegated to the // decorated Executor. For NamedActions, additional name-scoped stats are also emitted. func (m *metrics) Execute(ctx context.Context, actions []Action) error { wrapped := make([]Action, len(actions)) global := m.stats.get("all_actions") for i, a := range actions { if na, ok := a.(NamedAction); ok { // composed the NamedAction with global and name-scoped stats wrapped[i] = namedStatAction{ NamedAction: na, global: global, stats: m.stats.get(na.ID()), } } else { // otherwise, just compose with global stats wrapped[i] = statAction{ Action: a, global: global, } } } // delegate wrapped Actions to decorated Executor return m.ex.Execute(ctx, wrapped) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -5,13 +5,6 @@ type mutexCache struct { lookup map[string]*statSet } func (mc *mutexCache) get(name string) *statSet { // take a read lock to see if the set already exists mc.mtx.RLock() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,12 +1,9 @@ // syncMapCache implements statCache, backed by a sync.Map type syncMapCache struct { src StatSource lookup sync.Map } func (smc *syncMapCache) get(name string) *statSet { val, _ := smc.lookup.Load(name) if set, ok := val.(*statSet); ok { -
rodaine revised this gist
Jul 10, 2018 . 1 changed file with 1 addition and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -6,7 +6,7 @@ type Action interface { } // An Executor performs a set of Actions. It is up to the implementing type // to define the concurrency and open/closed failure behavior of the actions. type Executor interface { // Execute performs all provided actions by calling their Execute method. // This method should make a best-effort to cancel outstanding actions if the -
rodaine revised this gist
Jul 10, 2018 . 1 changed file with 85 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,85 @@ type pool struct { done <-chan struct{} in chan poolAction } // Pool creates an Executor backed by a concurrent worker pool. Up to n Actions // can be in-flight simultaneously; if n is less than or equal to zero, // runtime.NumCPU is used. The done channel should be closed to release // resources held by the Executor. func Pool(n int, done <-chan struct{}) Executor { if n <= 0 { n = runtime.NumCPU() } p := pool{done: done, in: make(chan poolAction, n)} for i := 0; i < n; i++ { go p.work(p.in, p.done) } return p } // Execute enqueues all Actions on the worker pool, failing closed on the // first error or if ctx is cancelled. This method blocks until all enqueued // Actions have returned. In the event of an error, not all Actions may be // executed. func (p pool) Execute(ctx context.Context, actions []Action) error { qty := len(actions) if qty == 0 { return nil } ctx, cancel := context.WithCancel(ctx) defer cancel() res := make(chan error, qty) var err error var queued uint64 enqueue: for _, action := range actions { pa := poolAction{ctx: ctx, act: action, res: res} select { case <-p.done: // pool is closed cancel() return errors.New("pool is closed") case <-ctx.Done(): // ctx is closed by caller err = ctx.Err() break enqueue case p.in <- pa: // enqueue action queued++ } } for ; queued > 0; queued-- { if r := <-res; r != nil { if err == nil { err = r cancel() } } } return err } func (p pool) work(in <-chan poolAction, done <-chan struct{}) { for { select { case <-done: return case a := <-in: a.res <- a.act.Execute(a.ctx) } } } type poolAction struct { ctx context.Context act Action res chan<- error } -
rodaine revised this gist
Jul 10, 2018 . 1 changed file with 19 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,19 @@ // Sequential implements Executor, performing each Action in series type Sequential struct{} // Execute performs each action in order, exiting on the first error or if the // context is cancelled/deadlined. func (Sequential) Execute(ctx context.Context, actions []Action) error { for _, a := range actions { select { case <-ctx.Done(): return ctx.Err() default: if err := a.Execute(ctx); err != nil { return err } } } return nil } -
rodaine revised this gist
Jul 2, 2018 . 1 changed file with 17 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,17 @@ BenchmarkMutexCache/10-8 10000000 180 ns/op 0 B/op 0 allocs/op BenchmarkMutexCache/100-8 10000000 187 ns/op 0 B/op 0 allocs/op BenchmarkMutexCache/1000-8 10000000 214 ns/op 0 B/op 0 allocs/op BenchmarkMutexCache/10000-8 10000000 231 ns/op 0 B/op 0 allocs/op BenchmarkMutexCache/100000-8 5000000 254 ns/op 2 B/op 0 allocs/op BenchmarkMutexCache/1000000-8 1000000 1159 ns/op 102 B/op 1 allocs/op BenchmarkMutexCache/10000000-8 1000000 1481 ns/op 184 B/op 2 allocs/op BenchmarkMutexCache/100000000-8 1000000 1655 ns/op 187 B/op 3 allocs/op BenchmarkSyncMapCache/10-8 5000000 221 ns/op 0 B/op 0 allocs/op BenchmarkSyncMapCache/100-8 10000000 235 ns/op 0 B/op 0 allocs/op BenchmarkSyncMapCache/1000-8 10000000 235 ns/op 0 B/op 0 allocs/op BenchmarkSyncMapCache/10000-8 10000000 246 ns/op 0 B/op 0 allocs/op BenchmarkSyncMapCache/100000-8 5000000 264 ns/op 5 B/op 0 allocs/op BenchmarkSyncMapCache/1000000-8 1000000 1378 ns/op 146 B/op 3 allocs/op BenchmarkSyncMapCache/10000000-8 1000000 1939 ns/op 237 B/op 5 allocs/op BenchmarkSyncMapCache/100000000-8 1000000 2090 ns/op 241 B/op 6 allocs/op -
rodaine revised this gist
May 3, 2018 . 1 changed file with 3 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,4 +1,3 @@ type syncMapCache struct { src StatSource lookup sync.Map @@ -14,9 +13,9 @@ func (smc *syncMapCache) get(name string) *statSet { return set } // create a new statSet, but don't store it if one was added since the last // load. This is not ideal since we can't atomically create the set and // write it. set, _ := smc.lookup.LoadOrStore(name, newStatSet(smc.src, name)) return set.(*statSet) } -
rodaine revised this gist
May 3, 2018 . 1 changed file with 22 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,22 @@ // syncMapCache implements statCache, backed by a sync.Map type syncMapCache struct { src StatSource lookup sync.Map } func newSyncMapCache(src StatSource) *syncMapCache { return &syncMapCache{src: src} } func (smc *syncMapCache) get(name string) *statSet { val, _ := smc.lookup.Load(name) if set, ok := val.(*statSet); ok { return set } // create a new statSet, but don't store it if one was added since the last // load. This is not ideal since we can't atomically create the set and // write it. set, _ := smc.lookup.LoadOrStore(name, newStatSet(smc.src, name)) return set.(*statSet) } -
rodaine revised this gist
Apr 10, 2018 . 1 changed file with 36 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,36 @@ // mutexCache implements statCache, backed by a map and sync.RWMutex type mutexCache struct { src StatSource mtx sync.RWMutex lookup map[string]*statSet } func newMutexCache(src StatSource) *mutexCache { return &mutexCache{ src: src, lookup: make(map[string]*statSet), } } func (mc *mutexCache) get(name string) *statSet { // take a read lock to see if the set already exists mc.mtx.RLock() set, ok := mc.lookup[name] mc.mtx.RUnlock() if ok { // the set exists, return it return set } // need to take a write lock to update the map mc.mtx.Lock() // While waiting for the write lock, another goroutine may have created the // set. Here, we check again after obtaining the lock before making a new one if set, ok = mc.lookup[name]; !ok { set = newStatSet(mc.src, name) mc.lookup[name] = set } mc.mtx.Unlock() return set } -
rodaine revised this gist
Apr 6, 2018 . 2 changed files with 74 additions and 9 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,74 @@ type metrics struct { ex Executor statCache } func (m *metrics) Execute(ctx context.Context, actions []Action) error { wrapped := make([]Action, len(actions)) global := m.get("all_actions") for i, a := range actions { if na, ok := a.(NamedAction); ok { wrapped[i] = namedStatAction{ NamedAction: na, global: global, stats: m.get(na.ID()), } } else { wrapped[i] = statAction{ Action: a, global: global, } } } return m.ex.Execute(ctx, wrapped) } type namedStatAction struct { NamedAction global *statSet stats *statSet } func (a namedStatAction) Execute(ctx context.Context) error { return captureMetrics(ctx, a.NamedAction, a.global, a.stats) } type statAction struct { Action global *statSet } func (a statAction) Execute(ctx context.Context) error { return captureMetrics(ctx, a.Action, a.global, nil) } func captureMetrics(ctx context.Context, a Action, global, stats *statSet) error { // execute the action, timing its latency start := time.Now() err := a.Execute(ctx) lat := time.Now().Sub(start) // create our counter values for error/success var errored, succeeded int if err != nil { errored = 1 } else { succeeded = 1 } // emit the global stats global.Latency(lat) global.Success(succeeded) global.Error(errored) // if there are name-scoped stats, emit those, too if stats != nil { stats.Latency(lat) stats.Success(succeeded) stats.Error(errored) } return err } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -23,15 +23,6 @@ type statSet struct { Error Counter } // Cache describes a read-through cache to obtain type statCache interface { // get returns a shared statSet for the given name, either from the cache or -
rodaine revised this gist
Apr 6, 2018 . 1 changed file with 40 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,40 @@ // StatSource creates metrics with the given name. The returned metrics should be // concurrency-safe. type StatSource interface { Timer(name string) Timer Counter(name string) Counter } // Timer emits the duration of a particular event. The duration value is // typically used to measure latencies and create histograms thereof. type Timer func(duration time.Duration) // Counter emits any number of events happening at a given time. For example, // Counters are often used to measure RPS. type Counter func(delta int) // A StatSet is the cached value. type statSet struct { // Latency measures how long an Action takes Latency Timer // Success is incremented when an Action does not return an error Success Counter // Error is incremented when an Action results in an error Error Counter } // newStatSet creates a statSet from the given src with the provided name. func newStatSet(src StatSource, name string) *statSet { return &statSet{ Latency: src.Timer(name), Success: src.Counter(name + ".success"), Error: src.Counter(name + ".error"), } } // Cache describes a read-through cache to obtain type statCache interface { // get returns a shared statSet for the given name, either from the cache or // a provided StatSource. get(name string) *statSet } -
rodaine revised this gist
Jan 26, 2018 . 1 changed file with 1 addition and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -12,6 +12,7 @@ type debouncer struct { sf *singleflight.Group } // For any action func (d debouncer) Execute(ctx context.Context, actions []Action) error { wrapped := make([]Action, len(actions)) -
rodaine revised this gist
Jan 26, 2018 . 1 changed file with 0 additions and 35 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,38 +1,3 @@ // Debounce wraps e, preventing duplicate NamedActions from running // concurrently, even from concurrent calls to Execute. func Debounce(e Executor) Executor { -
rodaine revised this gist
Jan 26, 2018 . 3 changed files with 113 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,80 @@ package cmd import ( "context" "golang.org/x/sync/singleflight" ) // A NamedAction describes an Action that also has a unique identifier. This // interface is used by the Debounce Executor to prevent duplicate actions from // running concurrently. type NamedAction interface { Action // ID returns the name for this Action. Identical actions // should return the same ID value. ID() string } type namedAction struct { ActionFunc name string } func (a namedAction) ID() string { return a.name } // Named creates a NamedAction from fn, with n as its name. This function is // just a helper to simplify creating NamedActions. func Named(n string, fn ActionFunc) NamedAction { return namedAction{ ActionFunc: fn, name: n, } } // Debounce wraps e, preventing duplicate NamedActions from running // concurrently, even from concurrent calls to Execute. func Debounce(e Executor) Executor { return debouncer{ ex: e, sf: new(singleflight.Group), } } type debouncer struct { ex Executor sf *singleflight.Group } func (d debouncer) Execute(ctx context.Context, actions []Action) error { wrapped := make([]Action, len(actions)) for i, a := range actions { if na, ok := a.(NamedAction); ok { wrapped[i] = debouncedAction{ NamedAction: na, sf: d.sf, } } else { wrapped[i] = actions[i] } } return d.ex.Execute(ctx, wrapped) } type debouncedAction struct { NamedAction sf *singleflight.Group } func (da debouncedAction) Execute(ctx context.Context) error { fn := func() (interface{}, error) { return nil, da.NamedAction.Execute(ctx) } _, err, _ := da.sf.Do(da.ID(), fn) return err } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -13,3 +13,10 @@ type Executor interface { // provided ctx is cancelled. Execute(ctx context.Context, actions []Action) error } // ActionFunc permits using a standalone function as an Action. type ActionFunc func(context.Context) error // Execute satisfies the Action interface, delegating the call to the // underlying function. func (fn ActionFunc) Execute(ctx context.Context) error { return fn(ctx) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,26 @@ // A NamedAction describes an Action that also has a unique identifier. This // interface is used by the Debounce Executor to prevent duplicate actions from // running concurrently. type NamedAction interface { Action // ID returns the name for this Action. Identical actions // should return the same ID value. ID() string } type namedAction struct { ActionFunc name string } func (a namedAction) ID() string { return a.name } // Named creates a NamedAction from fn, with n as its name. This function is // just a helper to simplify creating NamedActions. func Named(n string, fn ActionFunc) NamedAction { return namedAction{ ActionFunc: fn, name: n, } } -
rodaine revised this gist
Jan 26, 2018 . 3 changed files with 0 additions and 21 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,7 +1,3 @@ // An Action performs a single arbitrary task. type Action interface { // Execute performs the work of an Action. This method should make a best This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,12 +1,3 @@ type flow struct { maxActions int64 actions *semaphore.Weighted This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,11 +1,3 @@ // Parallel is a concurrent implementation of Executor type Parallel struct{} -
rodaine revised this gist
Sep 3, 2017 . 1 changed file with 2 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -14,7 +14,8 @@ type flow struct { ex Executor } // ControlFlow decorates an Executor, limiting it to a maximum concurrent // number of calls and actions. func ControlFlow(e Executor, maxCalls, maxActions int64) Executor { return &flow{ maxActions: maxActions, -
rodaine created this gist
Sep 2, 2017 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,19 @@ package cmd import "context" // An Action performs a single arbitrary task. type Action interface { // Execute performs the work of an Action. This method should make a best // effort to be cancelled if the provided ctx is cancelled. Execute(ctx context.Context) error } // An Executor performs a set of Actions. It is up to the implementing type // the concurrency and open/closed failure behavior of the actions. type Executor interface { // Execute performs all provided actions by calling their Execute method. // This method should make a best-effort to cancel outstanding actions if the // provided ctx is cancelled. Execute(ctx context.Context, actions []Action) error } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,47 @@ package cmd import ( "context" "fmt" "golang.org/x/sync/semaphore" ) type flow struct { maxActions int64 actions *semaphore.Weighted calls *semaphore.Weighted ex Executor } // ControlFlow an Executor, limiting it to a maximum concurrent number of calls and actions. func ControlFlow(e Executor, maxCalls, maxActions int64) Executor { return &flow{ maxActions: maxActions, calls: semaphore.NewWeighted(maxCalls), actions: semaphore.NewWeighted(maxActions), } } // Execute attempts to acquire the semaphores for the concurrent calls and // actions before delegating to the decorated Executor. If Execute is called // with more actions than maxActions, an error is returned. func (f *flow) Execute(ctx context.Context, actions []Action) error { qty := int64(len(actions)) if qty > f.maxActions { return fmt.Errorf("maximum %d actions allowed", f.maxActions) } if err := f.calls.Acquire(ctx, 1); err != nil { return err } defer f.calls.Release(1) if err := f.actions.Acquire(ctx, qty); err != nil { return err } defer f.calls.Release(qty) return f.ex.Execute(ctx, actions) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,28 @@ package cmd import ( "context" "golang.org/x/sync/errgroup" ) // Parallel is a concurrent implementation of Executor type Parallel struct{} // Execute performs all provided actions in concurrently, failing closed on the // first error or if ctx is cancelled. func (p Parallel) Execute(ctx context.Context, actions []Action) error { grp, ctx := errgroup.WithContext(ctx) for _, a := range actions { grp.Go(p.execFn(ctx, a)) } return grp.Wait() } // execFn binds the Context and Action to the proper function signature for the // errgroup.Group. func (p Parallel) execFn(ctx context.Context, a Action) func() error { return func() error { return a.Execute(ctx) } }