Skip to content

Instantly share code, notes, and snippets.

@akhansari
Last active March 3, 2022 11:50
Show Gist options
  • Save akhansari/d88812b742aa6be1c35b4f46bd9f8532 to your computer and use it in GitHub Desktop.
Save akhansari/d88812b742aa6be1c35b4f46bd9f8532 to your computer and use it in GitHub Desktop.

Revisions

  1. akhansari revised this gist Mar 3, 2022. 1 changed file with 28 additions and 18 deletions.
    46 changes: 28 additions & 18 deletions AsyncSeq.fs
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,10 @@
    module AsyncSeq =
    open System.Collections.Generic
    open System.Threading.Tasks

    let cancelled (cancellationToken: CancellationToken) =
    Task.FromCanceled<bool> cancellationToken
    |> ValueTask<bool>

    let ofSeq (sq: Task<'T> seq) = {
    new IAsyncEnumerable<'T> with
    @@ -14,15 +18,18 @@ module AsyncSeq =
    enumerator.Dispose()
    ValueTask.CompletedTask
    member _.MoveNextAsync() =
    Task.Run<bool>(fun () -> task {
    if enumerator.MoveNext() then
    let! res = enumerator.Current
    current <- res
    return true
    else
    return false
    }, cancellationToken)
    |> ValueTask<bool>
    if cancellationToken.IsCancellationRequested then
    cancelled cancellationToken
    else
    task {
    if enumerator.MoveNext() then
    let! res = enumerator.Current
    current <- res
    return true
    else
    return false
    }
    |> ValueTask<bool>
    }
    }

    @@ -38,15 +45,18 @@ module AsyncSeq =
    enumerator.Dispose()
    ValueTask.CompletedTask
    member _.MoveNextAsync() =
    Task.Run<bool>(fun () -> task {
    if enumerator.MoveNext() then
    let! res = mapping enumerator.Current
    current <- res
    return true
    else
    return false
    }, cancellationToken)
    |> ValueTask<bool>
    if cancellationToken.IsCancellationRequested then
    cancelled cancellationToken
    else
    task {
    if enumerator.MoveNext() then
    let! res = mapping enumerator.Current
    current <- res
    return true
    else
    return false
    }
    |> ValueTask<bool>
    }
    }

  2. akhansari revised this gist Feb 16, 2022. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion AsyncSeq.fs
    Original file line number Diff line number Diff line change
    @@ -56,7 +56,7 @@ module AsyncSeq =
    let! go = enumerator.MoveNextAsync()
    if go then
    do! action enumerator.Current
    do! iter enumerator
    return! iter enumerator
    }
    task {
    use enumerator = asyncSeq.GetAsyncEnumerator()
  3. akhansari renamed this gist Feb 16, 2022. 1 changed file with 14 additions and 1 deletion.
    15 changes: 14 additions & 1 deletion AsyncEnumerable.fs → AsyncSeq.fs
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    module AsyncEnumerable =
    module AsyncSeq =
    open System.Collections.Generic
    open System.Threading.Tasks

    @@ -48,4 +48,17 @@ module AsyncEnumerable =
    }, cancellationToken)
    |> ValueTask<bool>
    }
    }

    let iter action (asyncSeq: IAsyncEnumerable<'T>) =
    let rec iter (enumerator: IAsyncEnumerator<'T>) =
    task {
    let! go = enumerator.MoveNextAsync()
    if go then
    do! action enumerator.Current
    do! iter enumerator
    }
    task {
    use enumerator = asyncSeq.GetAsyncEnumerator()
    do! iter enumerator
    }
  4. akhansari revised this gist Feb 15, 2022. 1 changed file with 2 additions and 3 deletions.
    5 changes: 2 additions & 3 deletions AsyncEnumerable.fs
    Original file line number Diff line number Diff line change
    @@ -1,7 +1,6 @@
    open System.Collections.Generic
    open System.Threading.Tasks

    module AsyncEnumerable =
    open System.Collections.Generic
    open System.Threading.Tasks

    let ofSeq (sq: Task<'T> seq) = {
    new IAsyncEnumerable<'T> with
  5. akhansari revised this gist Feb 15, 2022. 1 changed file with 0 additions and 1 deletion.
    1 change: 0 additions & 1 deletion AsyncEnumerable.fs
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,3 @@
    open System
    open System.Collections.Generic
    open System.Threading.Tasks

  6. akhansari created this gist Feb 15, 2022.
    53 changes: 53 additions & 0 deletions AsyncEnumerable.fs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,53 @@
    open System
    open System.Collections.Generic
    open System.Threading.Tasks

    module AsyncEnumerable =

    let ofSeq (sq: Task<'T> seq) = {
    new IAsyncEnumerable<'T> with
    member _.GetAsyncEnumerator cancellationToken =
    let enumerator = sq.GetEnumerator()
    let mutable current = Unchecked.defaultof<_>
    { new IAsyncEnumerator<'T> with
    member _.Current =
    current
    member _.DisposeAsync() =
    enumerator.Dispose()
    ValueTask.CompletedTask
    member _.MoveNextAsync() =
    Task.Run<bool>(fun () -> task {
    if enumerator.MoveNext() then
    let! res = enumerator.Current
    current <- res
    return true
    else
    return false
    }, cancellationToken)
    |> ValueTask<bool>
    }
    }

    let ofSeqMap mapping (sq: 'T seq) = {
    new IAsyncEnumerable<'U> with
    member _.GetAsyncEnumerator cancellationToken =
    let enumerator = sq.GetEnumerator()
    let mutable current = Unchecked.defaultof<_>
    { new IAsyncEnumerator<'U> with
    member _.Current =
    current
    member _.DisposeAsync() =
    enumerator.Dispose()
    ValueTask.CompletedTask
    member _.MoveNextAsync() =
    Task.Run<bool>(fun () -> task {
    if enumerator.MoveNext() then
    let! res = mapping enumerator.Current
    current <- res
    return true
    else
    return false
    }, cancellationToken)
    |> ValueTask<bool>
    }
    }