import Combine public final class PublisherQueue { private typealias Operation = Publishers.MakeConnectable> private let operationQueue: PassthroughSubject private var subscriptions: Set public init(size: Int, maxConcurrentPublishers: Subscribers.Demand = .max(1)) { self.operationQueue = PassthroughSubject() self.subscriptions = [] operationQueue .buffer(size: size, prefetch: .keepFull, whenFull: .dropNewest) .flatMap(maxPublishers: maxConcurrentPublishers) { operation in operation.autoconnect() } .sink { _ in } .store(in: &subscriptions) } public func queuedPublisher(_ publisher: P) -> AnyPublisher where P.Failure == Never { let operation: Operation = publisher .eraseToAnyEventPublisher() .makeConnectable() return operation.flatMap { event -> AnyPublisher in switch event { case let .output(value): return Result.success(value as! P.Output).publisher.eraseToAnyPublisher() case .completion(.finished): return Empty().eraseToAnyPublisher() case .completion(.failure(.failure)): fatalError("unreachable") } } .handleEvents(receiveSubscription: { [operationQueue] _ in operationQueue.send(operation) }) .eraseToAnyPublisher() } public func queuedPublisher(_ publisher: P) -> AnyPublisher { let operation: Operation = publisher .eraseToAnyEventPublisher() .makeConnectable() return operation.setFailureType(to: P.Failure.self).flatMap { event -> AnyPublisher in switch event { case let .output(value): return Result.success(value as! P.Output).publisher.eraseToAnyPublisher() case .completion(.finished): return Empty().eraseToAnyPublisher() case let .completion(.failure(.failure(error))): return Result.failure(error as! P.Failure).publisher.eraseToAnyPublisher() } } .handleEvents(receiveSubscription: { [operationQueue] _ in operationQueue.send(operation) }) .eraseToAnyPublisher() } }