Skip to content

Instantly share code, notes, and snippets.

@sharplet
Last active February 17, 2024 02:18
Show Gist options
  • Save sharplet/b2f65b67a3761043ba675150cad59b51 to your computer and use it in GitHub Desktop.
Save sharplet/b2f65b67a3761043ba675150cad59b51 to your computer and use it in GitHub Desktop.

Revisions

  1. sharplet revised this gist Oct 29, 2020. 2 changed files with 3 additions and 3 deletions.
    4 changes: 2 additions & 2 deletions PublisherQueue.swift
    Original file line number Diff line number Diff line change
    @@ -19,7 +19,7 @@ public final class PublisherQueue {
    .store(in: &subscriptions)
    }

    public func enqueue<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure>
    public func queuedPublisher<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure>
    where P.Failure == Never
    {
    let operation: Operation = publisher
    @@ -42,7 +42,7 @@ public final class PublisherQueue {
    .eraseToAnyPublisher()
    }

    public func enqueue<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> {
    public func queuedPublisher<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> {
    let operation: Operation = publisher
    .eraseToAnyEventPublisher()
    .makeConnectable()
    2 changes: 1 addition & 1 deletion Test.swift
    Original file line number Diff line number Diff line change
    @@ -18,7 +18,7 @@ let group = DispatchGroup()

    for _ in 1 ... 1000 {
    group.enter()
    q.enqueue(randomInt()).sink(receiveCompletion: { _ in group.leave() }) { number in
    q.queuedPublisher(randomInt()).sink(receiveCompletion: { _ in group.leave() }) { number in
    resultCount += 1
    }.store(in: &subscriptions)
    }
  2. sharplet revised this gist Oct 29, 2020. 1 changed file with 10 additions and 6 deletions.
    16 changes: 10 additions & 6 deletions PublisherQueue.swift
    Original file line number Diff line number Diff line change
    @@ -26,8 +26,6 @@ public final class PublisherQueue {
    .eraseToAnyEventPublisher()
    .makeConnectable()

    operationQueue.send(operation)

    return operation.flatMap { event -> AnyPublisher<P.Output, P.Failure> in
    switch event {
    case let .output(value):
    @@ -37,16 +35,18 @@ public final class PublisherQueue {
    case .completion(.failure(.failure)):
    fatalError("unreachable")
    }
    }.eraseToAnyPublisher()
    }
    .handleEvents(receiveSubscription: { [operationQueue] _ in
    operationQueue.send(operation)
    })
    .eraseToAnyPublisher()
    }

    public func enqueue<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> {
    let operation: Operation = publisher
    .eraseToAnyEventPublisher()
    .makeConnectable()

    operationQueue.send(operation)

    return operation.setFailureType(to: P.Failure.self).flatMap { event -> AnyPublisher<P.Output, P.Failure> in
    switch event {
    case let .output(value):
    @@ -56,6 +56,10 @@ public final class PublisherQueue {
    case let .completion(.failure(.failure(error))):
    return Result.failure(error as! P.Failure).publisher.eraseToAnyPublisher()
    }
    }.eraseToAnyPublisher()
    }
    .handleEvents(receiveSubscription: { [operationQueue] _ in
    operationQueue.send(operation)
    })
    .eraseToAnyPublisher()
    }
    }
  3. sharplet revised this gist Oct 29, 2020. 1 changed file with 10 additions and 0 deletions.
    10 changes: 10 additions & 0 deletions Test.swift
    Original file line number Diff line number Diff line change
    @@ -1,6 +1,16 @@
    import Combine
    import Dispatch

    func randomInt() -> Deferred<Future<Int, Never>> {
    Deferred {
    Future { promise in
    DispatchQueue.global().asyncAfter(deadline: .now() + 0.01) {
    promise(.success(.random(in: .min ... .max)))
    }
    }
    }
    }

    let q = PublisherQueue(size: .max)
    var subscriptions = Set<AnyCancellable>()
    var resultCount = 0
  4. sharplet created this gist Oct 29, 2020.
    27 changes: 27 additions & 0 deletions AnyEventPublisher.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,27 @@
    import Combine

    public enum AnyError: Error {
    case never(Never)
    case failure(Error)
    }

    public enum AnyEvent {
    case output(Any)
    case completion(Subscribers.Completion<AnyError>)
    }

    extension Publisher where Failure == Never {
    public func eraseToAnyEventPublisher() -> AnyPublisher<AnyEvent, Never> {
    map(AnyEvent.output)
    .catch { error in Just(.completion(.failure(.never(error)))) }
    .eraseToAnyPublisher()
    }
    }

    extension Publisher {
    public func eraseToAnyEventPublisher() -> AnyPublisher<AnyEvent, Never> {
    map(AnyEvent.output)
    .catch { error in Just(.completion(.failure(.failure(error)))) }
    .eraseToAnyPublisher()
    }
    }
    61 changes: 61 additions & 0 deletions PublisherQueue.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,61 @@
    import Combine

    public final class PublisherQueue {
    private typealias Operation = Publishers.MakeConnectable<AnyPublisher<AnyEvent, Never>>

    private let operationQueue: PassthroughSubject<Operation, Never>
    private var subscriptions: Set<AnyCancellable>

    public init(size: Int, maxConcurrentPublishers: Subscribers.Demand = .max(1)) {
    self.operationQueue = PassthroughSubject()
    self.subscriptions = []

    operationQueue
    .buffer(size: size, prefetch: .keepFull, whenFull: .dropNewest)
    .flatMap(maxPublishers: maxConcurrentPublishers) { operation in
    operation.autoconnect()
    }
    .sink { _ in }
    .store(in: &subscriptions)
    }

    public func enqueue<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure>
    where P.Failure == Never
    {
    let operation: Operation = publisher
    .eraseToAnyEventPublisher()
    .makeConnectable()

    operationQueue.send(operation)

    return operation.flatMap { event -> AnyPublisher<P.Output, P.Failure> in
    switch event {
    case let .output(value):
    return Result.success(value as! P.Output).publisher.eraseToAnyPublisher()
    case .completion(.finished):
    return Empty().eraseToAnyPublisher()
    case .completion(.failure(.failure)):
    fatalError("unreachable")
    }
    }.eraseToAnyPublisher()
    }

    public func enqueue<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> {
    let operation: Operation = publisher
    .eraseToAnyEventPublisher()
    .makeConnectable()

    operationQueue.send(operation)

    return operation.setFailureType(to: P.Failure.self).flatMap { event -> AnyPublisher<P.Output, P.Failure> in
    switch event {
    case let .output(value):
    return Result.success(value as! P.Output).publisher.eraseToAnyPublisher()
    case .completion(.finished):
    return Empty().eraseToAnyPublisher()
    case let .completion(.failure(.failure(error))):
    return Result.failure(error as! P.Failure).publisher.eraseToAnyPublisher()
    }
    }.eraseToAnyPublisher()
    }
    }
    17 changes: 17 additions & 0 deletions Test.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,17 @@
    import Combine
    import Dispatch

    let q = PublisherQueue(size: .max)
    var subscriptions = Set<AnyCancellable>()
    var resultCount = 0
    let group = DispatchGroup()

    for _ in 1 ... 1000 {
    group.enter()
    q.enqueue(randomInt()).sink(receiveCompletion: { _ in group.leave() }) { number in
    resultCount += 1
    }.store(in: &subscriptions)
    }

    group.wait()
    print(resultCount, "results") // 1000 results