Skip to content

Instantly share code, notes, and snippets.

@ollieatkinson
Created August 1, 2022 08:40
Show Gist options
  • Select an option

  • Save ollieatkinson/cf7dfb8b0023a6a3baded2f56da674f1 to your computer and use it in GitHub Desktop.

Select an option

Save ollieatkinson/cf7dfb8b0023a6a3baded2f56da674f1 to your computer and use it in GitHub Desktop.

Revisions

  1. ollieatkinson created this gist Aug 1, 2022.
    61 changes: 61 additions & 0 deletions CombineLatest.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,61 @@
    public func combineLatest<C>(
    _ collection: C,
    bufferingPolicy limit: AsyncStream<[C.Element.Element]>.Continuation.BufferingPolicy = .unbounded
    ) -> AsyncStream<[C.Element.Element]> where C: Collection, C.Element: AsyncSequence {
    AsyncStream(bufferingPolicy: limit) { continuation in
    let stream = CombineLatestActor<C.Element.Element>(collection.count)
    continuation.onTermination = { @Sendable termination in
    switch termination {
    case .cancelled: Task { await stream.cancel() }
    case .finished: break
    }
    }
    for (i, sequence) in collection.enumerated() {
    Task {
    for try await value in sequence {
    if await stream.isCancelled {
    throw CancellationError()
    }
    if let values = await stream.insert(value, at: i) {
    continuation.yield(values)
    }
    }
    if await stream.complete(i) {
    continuation.finish()
    }
    }
    }
    }
    }

    private actor CombineLatestActor<Element> {

    var values: Array<Element?>
    var seen, completed: Set<Int>
    var isCancelled: Bool = false

    init(_ count: Int) {
    values = [Element?](repeating: nil, count: count)
    seen = .init(minimumCapacity: count)
    completed = .init(minimumCapacity: count)
    }

    @discardableResult
    func insert(_ value: Element, at index: Int) -> [Element]? {
    seen.insert(index)
    values[index] = value
    return seen.count == values.count
    ? values.map { $0.unsafelyUnwrapped }
    : nil
    }

    @discardableResult
    func complete(_ index: Int) -> Bool {
    completed.insert(index)
    return completed.count == values.count
    }

    func cancel() {
    isCancelled = true
    }
    }