struct ReducedReplayAsyncStream { typealias Reduce = (_ partialResult: inout Element, _ nextResult: Element) -> Void private let storage: _Storage private var originalStream: AsyncStream init( bufferingPolicy limit: AsyncStream.Continuation.BufferingPolicy = .unbounded, initialResult: Element, reduce: @escaping Reduce, build: (AsyncStream.Continuation) -> Void ) { originalStream = AsyncStream(Element.self, bufferingPolicy: limit, build) storage = _Storage(stored: initialResult, reduce: reduce) } private func makeStream() -> AsyncStream { AsyncStream { continuation in Task { var isFirst = false if await !storage.didStart { await storage.setDidStart(true) isFirst = true startConsumingOriginalStream() } if !isFirst { await continuation.yield(storage.stored) } await storage.appendContinuation(continuation) } } } private func startConsumingOriginalStream () { Task { for await value in originalStream { await storage.updateWith(value: value) } await storage.continuations.forEach { $0.finish() } } } } extension ReducedReplayAsyncStream { private actor _Storage { private let reduce: ReducedReplayAsyncStream.Reduce var didStart = false var stored: Element var continuations: [AsyncStream.Continuation] = [] init(stored: Element, reduce: @escaping Reduce) { self.stored = stored self.reduce = reduce } func updateWith(value: Element) { reduce(&stored, value) continuations.forEach { $0.yield(value) } } func setDidStart(_ value: Bool) { didStart = value } func appendContinuation(_ continuation: AsyncStream.Continuation) { continuations.append(continuation) } } } extension ReducedReplayAsyncStream: AsyncSequence { typealias AsyncIterator = AsyncStream.AsyncIterator func makeAsyncIterator() -> AsyncIterator { let stream = makeStream() return stream.makeAsyncIterator() } }