Skip to content

Instantly share code, notes, and snippets.

@eulerfx
Created April 7, 2018 14:13
Show Gist options
  • Select an option

  • Save eulerfx/b26a840646d83835bbcd921a74c83fc5 to your computer and use it in GitHub Desktop.

Select an option

Save eulerfx/b26a840646d83835bbcd921a74c83fc5 to your computer and use it in GitHub Desktop.

Revisions

  1. eulerfx created this gist Apr 7, 2018.
    29 changes: 29 additions & 0 deletions Async.ParallelThrottledIgnore.fs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,29 @@
    let ParallelThrottledIgnore (startOnCallingThread:bool) (parallelism:int) (xs:seq<Async<_>>) = async {
    let! ct = Async.CancellationToken
    let sm = new SemaphoreSlim(parallelism)
    let count = ref 1
    let res = TaskCompletionSource<_>()
    let tryWait () =
    try sm.Wait () ; true
    with _ -> false
    let tryComplete () =
    if Interlocked.Decrement count = 0 then
    res.TrySetResult() |> ignore
    false
    else
    not res.Task.IsCompleted
    let ok _ =
    if tryComplete () then
    try sm.Release () |> ignore with _ -> ()
    let err (ex:exn) = res.TrySetException ex |> ignore
    let cnc (_:OperationCanceledException) = res.TryCancel() |> ignore
    let start = async {
    use en = xs.GetEnumerator()
    while not (res.Task.IsCompleted) && en.MoveNext() do
    if tryWait () then
    Interlocked.Increment count |> ignore
    if startOnCallingThread then Async.StartWithContinuations (en.Current, ok, err, cnc, ct)
    else startThreadPoolWithContinuations (en.Current, ok, err, cnc, ct)
    tryComplete () |> ignore }
    Async.Start (tryWith (err >> async.Return) start, ct)
    return! res.Task |> Async.AwaitTask }