/// A class tracking a sequence/sets of events public class EventCompleter { /// The set tracking all inserted events public private(set) var eventSet: Event = [] /// The target set that will notify a completion public private(set) var targetSet: Event = [] private var continuation: CheckedContinuation? public var hasCompleted: Bool { targetSet.isSubset(of: eventSet) && !targetSet.isEmpty } public init(_ initialSet: Event = [], targetSet: Event = []) { self.eventSet = initialSet self.targetSet = targetSet } /// Clear event set by making the set empty public func clear() { eventSet = [] } /// Add a new event to the completion set public func insertEvent(_ event: Event) { eventSet = eventSet.union(event) if hasCompleted { continuation?.resume() } } /// Wait for the set of events to be completed or complete after a timeout public func completion(of events: Event, timeout: TimeInterval? = nil) async throws { targetSet = events // Check for early exit if hasCompleted { return } if let timeout { try await withTimeout(timeout) { try await withCheckedThrowingContinuation { [weak self] continuation in self?.continuation = continuation } } } else { try await withCheckedThrowingContinuation { [weak self] continuation in self?.continuation = continuation } } } } // MARK: Timeout struct TimedOutError: Error, Equatable {} func withTimeout(_ timeout: TimeInterval, do work: @escaping () async throws -> R, cleanup: (() -> Void)? = nil) async throws -> R { return try await withThrowingTaskGroup(of: R.self) { group in group.addTask { try Task.checkCancellation() let returnValue = try await work() return returnValue } group.addTask { try await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000_000)) try Task.checkCancellation() cleanup?() throw TimedOutError() } guard let result = try await group.next() else { throw TimedOutError() } group.cancelAll() return result } }