Last active
February 17, 2024 02:18
-
-
Save sharplet/b2f65b67a3761043ba675150cad59b51 to your computer and use it in GitHub Desktop.
Revisions
-
sharplet revised this gist
Oct 29, 2020 . 2 changed files with 3 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -19,7 +19,7 @@ public final class PublisherQueue { .store(in: &subscriptions) } public func queuedPublisher<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> where P.Failure == Never { let operation: Operation = publisher @@ -42,7 +42,7 @@ public final class PublisherQueue { .eraseToAnyPublisher() } public func queuedPublisher<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> { let operation: Operation = publisher .eraseToAnyEventPublisher() .makeConnectable() This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -18,7 +18,7 @@ let group = DispatchGroup() for _ in 1 ... 1000 { group.enter() q.queuedPublisher(randomInt()).sink(receiveCompletion: { _ in group.leave() }) { number in resultCount += 1 }.store(in: &subscriptions) } -
sharplet revised this gist
Oct 29, 2020 . 1 changed file with 10 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -26,8 +26,6 @@ public final class PublisherQueue { .eraseToAnyEventPublisher() .makeConnectable() return operation.flatMap { event -> AnyPublisher<P.Output, P.Failure> in switch event { case let .output(value): @@ -37,16 +35,18 @@ public final class PublisherQueue { case .completion(.failure(.failure)): fatalError("unreachable") } } .handleEvents(receiveSubscription: { [operationQueue] _ in operationQueue.send(operation) }) .eraseToAnyPublisher() } public func enqueue<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> { let operation: Operation = publisher .eraseToAnyEventPublisher() .makeConnectable() return operation.setFailureType(to: P.Failure.self).flatMap { event -> AnyPublisher<P.Output, P.Failure> in switch event { case let .output(value): @@ -56,6 +56,10 @@ public final class PublisherQueue { case let .completion(.failure(.failure(error))): return Result.failure(error as! P.Failure).publisher.eraseToAnyPublisher() } } .handleEvents(receiveSubscription: { [operationQueue] _ in operationQueue.send(operation) }) .eraseToAnyPublisher() } } -
sharplet revised this gist
Oct 29, 2020 . 1 changed file with 10 additions and 0 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,6 +1,16 @@ import Combine import Dispatch func randomInt() -> Deferred<Future<Int, Never>> { Deferred { Future { promise in DispatchQueue.global().asyncAfter(deadline: .now() + 0.01) { promise(.success(.random(in: .min ... .max))) } } } } let q = PublisherQueue(size: .max) var subscriptions = Set<AnyCancellable>() var resultCount = 0 -
sharplet created this gist
Oct 29, 2020 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,27 @@ import Combine public enum AnyError: Error { case never(Never) case failure(Error) } public enum AnyEvent { case output(Any) case completion(Subscribers.Completion<AnyError>) } extension Publisher where Failure == Never { public func eraseToAnyEventPublisher() -> AnyPublisher<AnyEvent, Never> { map(AnyEvent.output) .catch { error in Just(.completion(.failure(.never(error)))) } .eraseToAnyPublisher() } } extension Publisher { public func eraseToAnyEventPublisher() -> AnyPublisher<AnyEvent, Never> { map(AnyEvent.output) .catch { error in Just(.completion(.failure(.failure(error)))) } .eraseToAnyPublisher() } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,61 @@ import Combine public final class PublisherQueue { private typealias Operation = Publishers.MakeConnectable<AnyPublisher<AnyEvent, Never>> private let operationQueue: PassthroughSubject<Operation, Never> private var subscriptions: Set<AnyCancellable> public init(size: Int, maxConcurrentPublishers: Subscribers.Demand = .max(1)) { self.operationQueue = PassthroughSubject() self.subscriptions = [] operationQueue .buffer(size: size, prefetch: .keepFull, whenFull: .dropNewest) .flatMap(maxPublishers: maxConcurrentPublishers) { operation in operation.autoconnect() } .sink { _ in } .store(in: &subscriptions) } public func enqueue<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> where P.Failure == Never { let operation: Operation = publisher .eraseToAnyEventPublisher() .makeConnectable() operationQueue.send(operation) return operation.flatMap { event -> AnyPublisher<P.Output, P.Failure> in switch event { case let .output(value): return Result.success(value as! P.Output).publisher.eraseToAnyPublisher() case .completion(.finished): return Empty().eraseToAnyPublisher() case .completion(.failure(.failure)): fatalError("unreachable") } }.eraseToAnyPublisher() } public func enqueue<P: Publisher>(_ publisher: P) -> AnyPublisher<P.Output, P.Failure> { let operation: Operation = publisher .eraseToAnyEventPublisher() .makeConnectable() operationQueue.send(operation) return operation.setFailureType(to: P.Failure.self).flatMap { event -> AnyPublisher<P.Output, P.Failure> in switch event { case let .output(value): return Result.success(value as! P.Output).publisher.eraseToAnyPublisher() case .completion(.finished): return Empty().eraseToAnyPublisher() case let .completion(.failure(.failure(error))): return Result.failure(error as! P.Failure).publisher.eraseToAnyPublisher() } }.eraseToAnyPublisher() } } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,17 @@ import Combine import Dispatch let q = PublisherQueue(size: .max) var subscriptions = Set<AnyCancellable>() var resultCount = 0 let group = DispatchGroup() for _ in 1 ... 1000 { group.enter() q.enqueue(randomInt()).sink(receiveCompletion: { _ in group.leave() }) { number in resultCount += 1 }.store(in: &subscriptions) } group.wait() print(resultCount, "results") // 1000 results