Skip to content

Instantly share code, notes, and snippets.

@mnadel
Last active May 25, 2020 17:47
Show Gist options
  • Select an option

  • Save mnadel/df2ec09fe7eae9ba8938 to your computer and use it in GitHub Desktop.

Select an option

Save mnadel/df2ec09fe7eae9ba8938 to your computer and use it in GitHub Desktop.

Revisions

  1. mnadel revised this gist Nov 8, 2015. 1 changed file with 1 addition and 3 deletions.
    4 changes: 1 addition & 3 deletions ThrottledTplDataflowPipelne.cs
    Original file line number Diff line number Diff line change
    @@ -22,9 +22,7 @@ class Pipeline<T>
    }
    });

    _buffer.LinkTo(actionBlock, new DataflowLinkOptions {
    PropagateCompletion = true
    });
    _buffer.LinkTo(actionBlock);
    }

    void Post(T message)
  2. mnadel revised this gist Nov 7, 2015. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion ThrottledTplDataflowPipelne.cs
    Original file line number Diff line number Diff line change
    @@ -22,7 +22,7 @@ class Pipeline<T>
    }
    });

    _buffer.LinkTo (actionBlock, new DataflowLinkOptions {
    _buffer.LinkTo(actionBlock, new DataflowLinkOptions {
    PropagateCompletion = true
    });
    }
  3. mnadel revised this gist Nov 7, 2015. No changes.
  4. mnadel created this gist Nov 7, 2015.
    35 changes: 35 additions & 0 deletions ThrottledTplDataflowPipelne.cs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,35 @@
    using System.Threading;
    using System.Threading.Tasks.Dataflow;

    class Pipeline<T>
    {
    private readonly SemaphoreSlim _semaphore;
    private readonly BufferBlock<T> _buffer;

    Pipeline(Action<T> action, int capacity)
    {
    _semaphore = new SemaphoreSlim(capacity, capacity);
    _buffer = new BufferBlock<T>();

    var actionBlock = new ActionBlock<T>(t => {
    try
    {
    action(t);
    }
    finally
    {
    _semaphore.Release();
    }
    });

    _buffer.LinkTo (actionBlock, new DataflowLinkOptions {
    PropagateCompletion = true
    });
    }

    void Post(T message)
    {
    _semaphore.Wait();
    _buffer.Post(message);
    }
    }