Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save tkersey/b6c83aeb8e20787e9eb8ba4d0d54dba5 to your computer and use it in GitHub Desktop.
Save tkersey/b6c83aeb8e20787e9eb8ba4d0d54dba5 to your computer and use it in GitHub Desktop.

Revisions

  1. @ABridoux ABridoux revised this gist Jan 29, 2022. No changes.
  2. @ABridoux ABridoux created this gist Jan 29, 2022.
    82 changes: 82 additions & 0 deletions ReducedReplayAsyncStream.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,82 @@
    struct ReducedReplayAsyncStream<Element> {

    typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void

    private let storage: _Storage
    private var originalStream: AsyncStream<Element>

    init(
    bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded,
    initialResult: Element,
    reduce: @escaping Reduce,
    build: (AsyncStream<Element>.Continuation) -> Void
    ) {
    originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build)
    storage = _Storage(stored: initialResult, reduce: reduce)
    }

    private func makeStream() -> AsyncStream<Element> {
    AsyncStream<Element> { continuation in
    Task {
    var isFirst = false
    if await !storage.didStart {
    await storage.setDidStart(true)
    isFirst = true
    startConsumingOriginalStream()
    }

    if !isFirst {
    await continuation.yield(storage.stored)
    }
    await storage.appendContinuation(continuation)
    }
    }
    }

    private func startConsumingOriginalStream () {
    Task {
    for await value in originalStream {
    await storage.updateWith(value: value)
    }
    await storage.continuations.forEach { $0.finish() }
    }
    }
    }

    extension ReducedReplayAsyncStream {

    private actor _Storage {
    private let reduce: ReducedReplayAsyncStream.Reduce

    var didStart = false
    var stored: Element
    var continuations: [AsyncStream<Element>.Continuation] = []

    init(stored: Element, reduce: @escaping Reduce) {
    self.stored = stored
    self.reduce = reduce
    }

    func updateWith(value: Element) {
    reduce(&stored, value)
    continuations.forEach { $0.yield(value) }
    }

    func setDidStart(_ value: Bool) {
    didStart = value
    }

    func appendContinuation(_ continuation: AsyncStream<Element>.Continuation) {
    continuations.append(continuation)
    }
    }
    }

    extension ReducedReplayAsyncStream: AsyncSequence {
    typealias AsyncIterator = AsyncStream<Element>.AsyncIterator

    func makeAsyncIterator() -> AsyncIterator {
    let stream = makeStream()
    return stream.makeAsyncIterator()
    }
    }