Skip to content

Instantly share code, notes, and snippets.

@kevinclcn
Forked from rodaine/bench.txt
Created August 21, 2018 02:49
Show Gist options
  • Save kevinclcn/cce6d43f736b6827b724823a3eb8599e to your computer and use it in GitHub Desktop.
Save kevinclcn/cce6d43f736b6827b724823a3eb8599e to your computer and use it in GitHub Desktop.

Revisions

  1. @rodaine rodaine revised this gist Aug 17, 2018. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions flow.go
    Original file line number Diff line number Diff line change
    @@ -12,6 +12,7 @@ func ControlFlow(e Executor, maxCalls, maxActions int64) Executor {
    maxActions: maxActions,
    calls: semaphore.NewWeighted(maxCalls),
    actions: semaphore.NewWeighted(maxActions),
    ex: e,
    }
    }

  2. @rodaine rodaine revised this gist Aug 16, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion flow.go
    Original file line number Diff line number Diff line change
    @@ -35,7 +35,7 @@ func (f *flow) Execute(ctx context.Context, actions []Action) error {
    if err := f.actions.Acquire(ctx, qty); err != nil {
    return err
    }
    defer f.calls.Release(qty)
    defer f.actions.Release(qty)

    // delegate Actions to decorated Executor
    return f.ex.Execute(ctx, actions)
  3. @rodaine rodaine revised this gist Aug 15, 2018. 5 changed files with 20 additions and 16 deletions.
    10 changes: 8 additions & 2 deletions debounce.go
    Original file line number Diff line number Diff line change
    @@ -1,5 +1,5 @@
    // Debounce wraps e, preventing duplicate NamedActions from running
    // concurrently, even from concurrent calls to Execute.
    // concurrently, even from separate calls to Execute.
    func Debounce(e Executor) Executor {
    return debouncer{
    ex: e,
    @@ -12,21 +12,25 @@ type debouncer struct {
    sf *singleflight.Group
    }

    // For any action
    // Execute attaches a singleflight.Group to any NamedActions, effectively debouncing
    // identical Actions if ran concurrently.
    func (d debouncer) Execute(ctx context.Context, actions []Action) error {
    wrapped := make([]Action, len(actions))

    for i, a := range actions {
    if na, ok := a.(NamedAction); ok {
    // compose the NamedAction with the singleflight.Group
    wrapped[i] = debouncedAction{
    NamedAction: na,
    sf: d.sf,
    }
    } else {
    // otherwise, pass it through untouched
    wrapped[i] = actions[i]
    }
    }

    // delegate wrapped Actions to decorated Executor
    return d.ex.Execute(ctx, wrapped)
    }

    @@ -36,6 +40,8 @@ type debouncedAction struct {
    }

    func (da debouncedAction) Execute(ctx context.Context) error {
    // map the composed Action's Execute function with the expected signature
    // for singleflight.Group.Do.
    fn := func() (interface{}, error) {
    return nil, da.NamedAction.Execute(ctx)
    }
    3 changes: 3 additions & 0 deletions flow.go
    Original file line number Diff line number Diff line change
    @@ -25,15 +25,18 @@ func (f *flow) Execute(ctx context.Context, actions []Action) error {
    return fmt.Errorf("maximum %d actions allowed", f.maxActions)
    }

    // limit concurrent calls to Executor.Execute
    if err := f.calls.Acquire(ctx, 1); err != nil {
    return err
    }
    defer f.calls.Release(1)

    // limit total in-flight Actions, independent of Execute calls
    if err := f.actions.Acquire(ctx, qty); err != nil {
    return err
    }
    defer f.calls.Release(qty)

    // delegate Actions to decorated Executor
    return f.ex.Execute(ctx, actions)
    }
    11 changes: 8 additions & 3 deletions metrics.go
    Original file line number Diff line number Diff line change
    @@ -1,27 +1,32 @@
    type metrics struct {
    ex Executor
    statCache
    stats statCache
    }

    // Execute emits latency, success, and error metrics for every action delegated to the
    // decorated Executor. For NamedActions, additional name-scoped stats are also emitted.
    func (m *metrics) Execute(ctx context.Context, actions []Action) error {
    wrapped := make([]Action, len(actions))
    global := m.get("all_actions")
    global := m.stats.get("all_actions")

    for i, a := range actions {
    if na, ok := a.(NamedAction); ok {
    // composed the NamedAction with global and name-scoped stats
    wrapped[i] = namedStatAction{
    NamedAction: na,
    global: global,
    stats: m.get(na.ID()),
    stats: m.stats.get(na.ID()),
    }
    } else {
    // otherwise, just compose with global stats
    wrapped[i] = statAction{
    Action: a,
    global: global,
    }
    }
    }

    // delegate wrapped Actions to decorated Executor
    return m.ex.Execute(ctx, wrapped)
    }

    7 changes: 0 additions & 7 deletions statcache_mutex.go
    Original file line number Diff line number Diff line change
    @@ -5,13 +5,6 @@ type mutexCache struct {
    lookup map[string]*statSet
    }

    func newMutexCache(src StatSource) *mutexCache {
    return &mutexCache{
    src: src,
    lookup: make(map[string]*statSet),
    }
    }

    func (mc *mutexCache) get(name string) *statSet {
    // take a read lock to see if the set already exists
    mc.mtx.RLock()
    5 changes: 1 addition & 4 deletions statcache_syncmap.go
    Original file line number Diff line number Diff line change
    @@ -1,12 +1,9 @@
    // syncMapCache implements statCache, backed by a sync.Map
    type syncMapCache struct {
    src StatSource
    lookup sync.Map
    }

    func newSyncMapCache(src StatSource) *syncMapCache {
    return &syncMapCache{src: src}
    }

    func (smc *syncMapCache) get(name string) *statSet {
    val, _ := smc.lookup.Load(name)
    if set, ok := val.(*statSet); ok {
  4. @rodaine rodaine revised this gist Jul 10, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion executor.go
    Original file line number Diff line number Diff line change
    @@ -6,7 +6,7 @@ type Action interface {
    }

    // An Executor performs a set of Actions. It is up to the implementing type
    // the concurrency and open/closed failure behavior of the actions.
    // to define the concurrency and open/closed failure behavior of the actions.
    type Executor interface {
    // Execute performs all provided actions by calling their Execute method.
    // This method should make a best-effort to cancel outstanding actions if the
  5. @rodaine rodaine revised this gist Jul 10, 2018. 1 changed file with 85 additions and 0 deletions.
    85 changes: 85 additions & 0 deletions pool.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,85 @@

    type pool struct {
    done <-chan struct{}
    in chan poolAction
    }

    // Pool creates an Executor backed by a concurrent worker pool. Up to n Actions
    // can be in-flight simultaneously; if n is less than or equal to zero,
    // runtime.NumCPU is used. The done channel should be closed to release
    // resources held by the Executor.
    func Pool(n int, done <-chan struct{}) Executor {
    if n <= 0 {
    n = runtime.NumCPU()
    }

    p := pool{done: done, in: make(chan poolAction, n)}

    for i := 0; i < n; i++ {
    go p.work(p.in, p.done)
    }

    return p
    }

    // Execute enqueues all Actions on the worker pool, failing closed on the
    // first error or if ctx is cancelled. This method blocks until all enqueued
    // Actions have returned. In the event of an error, not all Actions may be
    // executed.
    func (p pool) Execute(ctx context.Context, actions []Action) error {
    qty := len(actions)
    if qty == 0 {
    return nil
    }

    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    res := make(chan error, qty)

    var err error
    var queued uint64

    enqueue:
    for _, action := range actions {
    pa := poolAction{ctx: ctx, act: action, res: res}
    select {
    case <-p.done: // pool is closed
    cancel()
    return errors.New("pool is closed")
    case <-ctx.Done(): // ctx is closed by caller
    err = ctx.Err()
    break enqueue
    case p.in <- pa: // enqueue action
    queued++
    }
    }

    for ; queued > 0; queued-- {
    if r := <-res; r != nil {
    if err == nil {
    err = r
    cancel()
    }
    }
    }

    return err
    }

    func (p pool) work(in <-chan poolAction, done <-chan struct{}) {
    for {
    select {
    case <-done:
    return
    case a := <-in:
    a.res <- a.act.Execute(a.ctx)
    }
    }
    }

    type poolAction struct {
    ctx context.Context
    act Action
    res chan<- error
    }
  6. @rodaine rodaine revised this gist Jul 10, 2018. 1 changed file with 19 additions and 0 deletions.
    19 changes: 19 additions & 0 deletions sequential.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,19 @@
    // Sequential implements Executor, performing each Action in series
    type Sequential struct{}

    // Execute performs each action in order, exiting on the first error or if the
    // context is cancelled/deadlined.
    func (Sequential) Execute(ctx context.Context, actions []Action) error {
    for _, a := range actions {
    select {
    case <-ctx.Done():
    return ctx.Err()
    default:
    if err := a.Execute(ctx); err != nil {
    return err
    }
    }
    }

    return nil
    }
  7. @rodaine rodaine revised this gist Jul 2, 2018. 1 changed file with 17 additions and 0 deletions.
    17 changes: 17 additions & 0 deletions bench.txt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,17 @@
    BenchmarkMutexCache/10-8 10000000 180 ns/op 0 B/op 0 allocs/op
    BenchmarkMutexCache/100-8 10000000 187 ns/op 0 B/op 0 allocs/op
    BenchmarkMutexCache/1000-8 10000000 214 ns/op 0 B/op 0 allocs/op
    BenchmarkMutexCache/10000-8 10000000 231 ns/op 0 B/op 0 allocs/op
    BenchmarkMutexCache/100000-8 5000000 254 ns/op 2 B/op 0 allocs/op
    BenchmarkMutexCache/1000000-8 1000000 1159 ns/op 102 B/op 1 allocs/op
    BenchmarkMutexCache/10000000-8 1000000 1481 ns/op 184 B/op 2 allocs/op
    BenchmarkMutexCache/100000000-8 1000000 1655 ns/op 187 B/op 3 allocs/op

    BenchmarkSyncMapCache/10-8 5000000 221 ns/op 0 B/op 0 allocs/op
    BenchmarkSyncMapCache/100-8 10000000 235 ns/op 0 B/op 0 allocs/op
    BenchmarkSyncMapCache/1000-8 10000000 235 ns/op 0 B/op 0 allocs/op
    BenchmarkSyncMapCache/10000-8 10000000 246 ns/op 0 B/op 0 allocs/op
    BenchmarkSyncMapCache/100000-8 5000000 264 ns/op 5 B/op 0 allocs/op
    BenchmarkSyncMapCache/1000000-8 1000000 1378 ns/op 146 B/op 3 allocs/op
    BenchmarkSyncMapCache/10000000-8 1000000 1939 ns/op 237 B/op 5 allocs/op
    BenchmarkSyncMapCache/100000000-8 1000000 2090 ns/op 241 B/op 6 allocs/op
  8. @rodaine rodaine revised this gist May 3, 2018. 1 changed file with 3 additions and 4 deletions.
    7 changes: 3 additions & 4 deletions statcache_syncmap.go
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,3 @@
    // syncMapCache implements statCache, backed by a sync.Map
    type syncMapCache struct {
    src StatSource
    lookup sync.Map
    @@ -14,9 +13,9 @@ func (smc *syncMapCache) get(name string) *statSet {
    return set
    }

    // create a new statSet, but don't store it if one was added since the last
    // load. This is not ideal since we can't atomically create the set and
    // write it.
    // create a new statSet, but don't store it if one was added since the last
    // load. This is not ideal since we can't atomically create the set and
    // write it.
    set, _ := smc.lookup.LoadOrStore(name, newStatSet(smc.src, name))
    return set.(*statSet)
    }
  9. @rodaine rodaine revised this gist May 3, 2018. 1 changed file with 22 additions and 0 deletions.
    22 changes: 22 additions & 0 deletions statcache_syncmap.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,22 @@
    // syncMapCache implements statCache, backed by a sync.Map
    type syncMapCache struct {
    src StatSource
    lookup sync.Map
    }

    func newSyncMapCache(src StatSource) *syncMapCache {
    return &syncMapCache{src: src}
    }

    func (smc *syncMapCache) get(name string) *statSet {
    val, _ := smc.lookup.Load(name)
    if set, ok := val.(*statSet); ok {
    return set
    }

    // create a new statSet, but don't store it if one was added since the last
    // load. This is not ideal since we can't atomically create the set and
    // write it.
    set, _ := smc.lookup.LoadOrStore(name, newStatSet(smc.src, name))
    return set.(*statSet)
    }
  10. @rodaine rodaine revised this gist Apr 10, 2018. 1 changed file with 36 additions and 0 deletions.
    36 changes: 36 additions & 0 deletions statcache_mutex.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,36 @@
    // mutexCache implements statCache, backed by a map and sync.RWMutex
    type mutexCache struct {
    src StatSource
    mtx sync.RWMutex
    lookup map[string]*statSet
    }

    func newMutexCache(src StatSource) *mutexCache {
    return &mutexCache{
    src: src,
    lookup: make(map[string]*statSet),
    }
    }

    func (mc *mutexCache) get(name string) *statSet {
    // take a read lock to see if the set already exists
    mc.mtx.RLock()
    set, ok := mc.lookup[name]
    mc.mtx.RUnlock()

    if ok { // the set exists, return it
    return set
    }

    // need to take a write lock to update the map
    mc.mtx.Lock()
    // While waiting for the write lock, another goroutine may have created the
    // set. Here, we check again after obtaining the lock before making a new one
    if set, ok = mc.lookup[name]; !ok {
    set = newStatSet(mc.src, name)
    mc.lookup[name] = set
    }
    mc.mtx.Unlock()

    return set
    }
  11. @rodaine rodaine revised this gist Apr 6, 2018. 2 changed files with 74 additions and 9 deletions.
    74 changes: 74 additions & 0 deletions metrics.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,74 @@
    type metrics struct {
    ex Executor
    statCache
    }

    func (m *metrics) Execute(ctx context.Context, actions []Action) error {
    wrapped := make([]Action, len(actions))
    global := m.get("all_actions")

    for i, a := range actions {
    if na, ok := a.(NamedAction); ok {
    wrapped[i] = namedStatAction{
    NamedAction: na,
    global: global,
    stats: m.get(na.ID()),
    }
    } else {
    wrapped[i] = statAction{
    Action: a,
    global: global,
    }
    }
    }

    return m.ex.Execute(ctx, wrapped)
    }

    type namedStatAction struct {
    NamedAction
    global *statSet
    stats *statSet
    }

    func (a namedStatAction) Execute(ctx context.Context) error {
    return captureMetrics(ctx, a.NamedAction, a.global, a.stats)
    }

    type statAction struct {
    Action
    global *statSet
    }

    func (a statAction) Execute(ctx context.Context) error {
    return captureMetrics(ctx, a.Action, a.global, nil)
    }

    func captureMetrics(ctx context.Context, a Action, global, stats *statSet) error {
    // execute the action, timing its latency
    start := time.Now()
    err := a.Execute(ctx)
    lat := time.Now().Sub(start)

    // create our counter values for error/success
    var errored, succeeded int
    if err != nil {
    errored = 1
    } else {
    succeeded = 1
    }

    // emit the global stats
    global.Latency(lat)
    global.Success(succeeded)
    global.Error(errored)

    // if there are name-scoped stats, emit those, too
    if stats != nil {
    stats.Latency(lat)
    stats.Success(succeeded)
    stats.Error(errored)
    }

    return err
    }
    9 changes: 0 additions & 9 deletions statcache.go
    Original file line number Diff line number Diff line change
    @@ -23,15 +23,6 @@ type statSet struct {
    Error Counter
    }

    // newStatSet creates a statSet from the given src with the provided name.
    func newStatSet(src StatSource, name string) *statSet {
    return &statSet{
    Latency: src.Timer(name),
    Success: src.Counter(name + ".success"),
    Error: src.Counter(name + ".error"),
    }
    }

    // Cache describes a read-through cache to obtain
    type statCache interface {
    // get returns a shared statSet for the given name, either from the cache or
  12. @rodaine rodaine revised this gist Apr 6, 2018. 1 changed file with 40 additions and 0 deletions.
    40 changes: 40 additions & 0 deletions statcache.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,40 @@
    // StatSource creates metrics with the given name. The returned metrics should be
    // concurrency-safe.
    type StatSource interface {
    Timer(name string) Timer
    Counter(name string) Counter
    }

    // Timer emits the duration of a particular event. The duration value is
    // typically used to measure latencies and create histograms thereof.
    type Timer func(duration time.Duration)

    // Counter emits any number of events happening at a given time. For example,
    // Counters are often used to measure RPS.
    type Counter func(delta int)

    // A StatSet is the cached value.
    type statSet struct {
    // Latency measures how long an Action takes
    Latency Timer
    // Success is incremented when an Action does not return an error
    Success Counter
    // Error is incremented when an Action results in an error
    Error Counter
    }

    // newStatSet creates a statSet from the given src with the provided name.
    func newStatSet(src StatSource, name string) *statSet {
    return &statSet{
    Latency: src.Timer(name),
    Success: src.Counter(name + ".success"),
    Error: src.Counter(name + ".error"),
    }
    }

    // Cache describes a read-through cache to obtain
    type statCache interface {
    // get returns a shared statSet for the given name, either from the cache or
    // a provided StatSource.
    get(name string) *statSet
    }
  13. @rodaine rodaine revised this gist Jan 26, 2018. 1 changed file with 1 addition and 0 deletions.
    1 change: 1 addition & 0 deletions debounce.go
    Original file line number Diff line number Diff line change
    @@ -12,6 +12,7 @@ type debouncer struct {
    sf *singleflight.Group
    }

    // For any action
    func (d debouncer) Execute(ctx context.Context, actions []Action) error {
    wrapped := make([]Action, len(actions))

  14. @rodaine rodaine revised this gist Jan 26, 2018. 1 changed file with 0 additions and 35 deletions.
    35 changes: 0 additions & 35 deletions debounce.go
    Original file line number Diff line number Diff line change
    @@ -1,38 +1,3 @@
    package cmd

    import (
    "context"

    "golang.org/x/sync/singleflight"
    )

    // A NamedAction describes an Action that also has a unique identifier. This
    // interface is used by the Debounce Executor to prevent duplicate actions from
    // running concurrently.
    type NamedAction interface {
    Action

    // ID returns the name for this Action. Identical actions
    // should return the same ID value.
    ID() string
    }

    type namedAction struct {
    ActionFunc
    name string
    }

    func (a namedAction) ID() string { return a.name }

    // Named creates a NamedAction from fn, with n as its name. This function is
    // just a helper to simplify creating NamedActions.
    func Named(n string, fn ActionFunc) NamedAction {
    return namedAction{
    ActionFunc: fn,
    name: n,
    }
    }

    // Debounce wraps e, preventing duplicate NamedActions from running
    // concurrently, even from concurrent calls to Execute.
    func Debounce(e Executor) Executor {
  15. @rodaine rodaine revised this gist Jan 26, 2018. 3 changed files with 113 additions and 0 deletions.
    80 changes: 80 additions & 0 deletions debounce.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,80 @@
    package cmd

    import (
    "context"

    "golang.org/x/sync/singleflight"
    )

    // A NamedAction describes an Action that also has a unique identifier. This
    // interface is used by the Debounce Executor to prevent duplicate actions from
    // running concurrently.
    type NamedAction interface {
    Action

    // ID returns the name for this Action. Identical actions
    // should return the same ID value.
    ID() string
    }

    type namedAction struct {
    ActionFunc
    name string
    }

    func (a namedAction) ID() string { return a.name }

    // Named creates a NamedAction from fn, with n as its name. This function is
    // just a helper to simplify creating NamedActions.
    func Named(n string, fn ActionFunc) NamedAction {
    return namedAction{
    ActionFunc: fn,
    name: n,
    }
    }

    // Debounce wraps e, preventing duplicate NamedActions from running
    // concurrently, even from concurrent calls to Execute.
    func Debounce(e Executor) Executor {
    return debouncer{
    ex: e,
    sf: new(singleflight.Group),
    }
    }

    type debouncer struct {
    ex Executor
    sf *singleflight.Group
    }

    func (d debouncer) Execute(ctx context.Context, actions []Action) error {
    wrapped := make([]Action, len(actions))

    for i, a := range actions {
    if na, ok := a.(NamedAction); ok {
    wrapped[i] = debouncedAction{
    NamedAction: na,
    sf: d.sf,
    }
    } else {
    wrapped[i] = actions[i]
    }
    }

    return d.ex.Execute(ctx, wrapped)
    }

    type debouncedAction struct {
    NamedAction
    sf *singleflight.Group
    }

    func (da debouncedAction) Execute(ctx context.Context) error {
    fn := func() (interface{}, error) {
    return nil, da.NamedAction.Execute(ctx)
    }

    _, err, _ := da.sf.Do(da.ID(), fn)

    return err
    }
    7 changes: 7 additions & 0 deletions executor.go
    Original file line number Diff line number Diff line change
    @@ -13,3 +13,10 @@ type Executor interface {
    // provided ctx is cancelled.
    Execute(ctx context.Context, actions []Action) error
    }

    // ActionFunc permits using a standalone function as an Action.
    type ActionFunc func(context.Context) error

    // Execute satisfies the Action interface, delegating the call to the
    // underlying function.
    func (fn ActionFunc) Execute(ctx context.Context) error { return fn(ctx) }
    26 changes: 26 additions & 0 deletions named.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,26 @@
    // A NamedAction describes an Action that also has a unique identifier. This
    // interface is used by the Debounce Executor to prevent duplicate actions from
    // running concurrently.
    type NamedAction interface {
    Action

    // ID returns the name for this Action. Identical actions
    // should return the same ID value.
    ID() string
    }

    type namedAction struct {
    ActionFunc
    name string
    }

    func (a namedAction) ID() string { return a.name }

    // Named creates a NamedAction from fn, with n as its name. This function is
    // just a helper to simplify creating NamedActions.
    func Named(n string, fn ActionFunc) NamedAction {
    return namedAction{
    ActionFunc: fn,
    name: n,
    }
    }
  16. @rodaine rodaine revised this gist Jan 26, 2018. 3 changed files with 0 additions and 21 deletions.
    4 changes: 0 additions & 4 deletions executor.go
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,3 @@
    package cmd

    import "context"

    // An Action performs a single arbitrary task.
    type Action interface {
    // Execute performs the work of an Action. This method should make a best
    9 changes: 0 additions & 9 deletions flow.go
    Original file line number Diff line number Diff line change
    @@ -1,12 +1,3 @@
    package cmd

    import (
    "context"
    "fmt"

    "golang.org/x/sync/semaphore"
    )

    type flow struct {
    maxActions int64
    actions *semaphore.Weighted
    8 changes: 0 additions & 8 deletions parallel.go
    Original file line number Diff line number Diff line change
    @@ -1,11 +1,3 @@
    package cmd

    import (
    "context"

    "golang.org/x/sync/errgroup"
    )

    // Parallel is a concurrent implementation of Executor
    type Parallel struct{}

  17. @rodaine rodaine revised this gist Sep 3, 2017. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion flow.go
    Original file line number Diff line number Diff line change
    @@ -14,7 +14,8 @@ type flow struct {
    ex Executor
    }

    // ControlFlow an Executor, limiting it to a maximum concurrent number of calls and actions.
    // ControlFlow decorates an Executor, limiting it to a maximum concurrent
    // number of calls and actions.
    func ControlFlow(e Executor, maxCalls, maxActions int64) Executor {
    return &flow{
    maxActions: maxActions,
  18. @rodaine rodaine created this gist Sep 2, 2017.
    19 changes: 19 additions & 0 deletions executor.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,19 @@
    package cmd

    import "context"

    // An Action performs a single arbitrary task.
    type Action interface {
    // Execute performs the work of an Action. This method should make a best
    // effort to be cancelled if the provided ctx is cancelled.
    Execute(ctx context.Context) error
    }

    // An Executor performs a set of Actions. It is up to the implementing type
    // the concurrency and open/closed failure behavior of the actions.
    type Executor interface {
    // Execute performs all provided actions by calling their Execute method.
    // This method should make a best-effort to cancel outstanding actions if the
    // provided ctx is cancelled.
    Execute(ctx context.Context, actions []Action) error
    }
    47 changes: 47 additions & 0 deletions flow.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,47 @@
    package cmd

    import (
    "context"
    "fmt"

    "golang.org/x/sync/semaphore"
    )

    type flow struct {
    maxActions int64
    actions *semaphore.Weighted
    calls *semaphore.Weighted
    ex Executor
    }

    // ControlFlow an Executor, limiting it to a maximum concurrent number of calls and actions.
    func ControlFlow(e Executor, maxCalls, maxActions int64) Executor {
    return &flow{
    maxActions: maxActions,
    calls: semaphore.NewWeighted(maxCalls),
    actions: semaphore.NewWeighted(maxActions),
    }
    }

    // Execute attempts to acquire the semaphores for the concurrent calls and
    // actions before delegating to the decorated Executor. If Execute is called
    // with more actions than maxActions, an error is returned.
    func (f *flow) Execute(ctx context.Context, actions []Action) error {
    qty := int64(len(actions))

    if qty > f.maxActions {
    return fmt.Errorf("maximum %d actions allowed", f.maxActions)
    }

    if err := f.calls.Acquire(ctx, 1); err != nil {
    return err
    }
    defer f.calls.Release(1)

    if err := f.actions.Acquire(ctx, qty); err != nil {
    return err
    }
    defer f.calls.Release(qty)

    return f.ex.Execute(ctx, actions)
    }
    28 changes: 28 additions & 0 deletions parallel.go
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,28 @@
    package cmd

    import (
    "context"

    "golang.org/x/sync/errgroup"
    )

    // Parallel is a concurrent implementation of Executor
    type Parallel struct{}

    // Execute performs all provided actions in concurrently, failing closed on the
    // first error or if ctx is cancelled.
    func (p Parallel) Execute(ctx context.Context, actions []Action) error {
    grp, ctx := errgroup.WithContext(ctx)

    for _, a := range actions {
    grp.Go(p.execFn(ctx, a))
    }

    return grp.Wait()
    }

    // execFn binds the Context and Action to the proper function signature for the
    // errgroup.Group.
    func (p Parallel) execFn(ctx context.Context, a Action) func() error {
    return func() error { return a.Execute(ctx) }
    }