type flow struct { maxActions int64 actions *semaphore.Weighted calls *semaphore.Weighted ex Executor } // ControlFlow decorates an Executor, limiting it to a maximum concurrent // number of calls and actions. func ControlFlow(e Executor, maxCalls, maxActions int64) Executor { return &flow{ maxActions: maxActions, calls: semaphore.NewWeighted(maxCalls), actions: semaphore.NewWeighted(maxActions), ex: e, } } // Execute attempts to acquire the semaphores for the concurrent calls and // actions before delegating to the decorated Executor. If Execute is called // with more actions than maxActions, an error is returned. func (f *flow) Execute(ctx context.Context, actions []Action) error { qty := int64(len(actions)) if qty > f.maxActions { return fmt.Errorf("maximum %d actions allowed", f.maxActions) } // limit concurrent calls to Executor.Execute if err := f.calls.Acquire(ctx, 1); err != nil { return err } defer f.calls.Release(1) // limit total in-flight Actions, independent of Execute calls if err := f.actions.Acquire(ctx, qty); err != nil { return err } defer f.actions.Release(qty) // delegate Actions to decorated Executor return f.ex.Execute(ctx, actions) }