import Foundation import _Concurrency extension AsyncStream { public init( _ sequence: S, bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded ) where S.Element == Element { self.init(bufferingPolicy: limit) { continuation in let task = Task { do { for try await element in sequence { continuation.yield(element) } } catch {} continuation.finish() } let terminationClosure: @Sendable (Continuation.Termination) -> Void = { _ in task.cancel() } continuation.onTermination = terminationClosure } } } // MARK: ---------------------------------------- // Using Swift 5.7 primary associated type protocol MyAsyncSequence: AsyncSequence {} extension AsyncStream: MyAsyncSequence {} let stream = AsyncStream { nil } let anyMySequence: any MyAsyncSequence = stream // ERROR: x is Any, not Int for try await x: Int in anyMySequence { } let mySeqStream = AsyncStream.init(anyMySequence) // NOTE: This `init` is type-erasure // OK: x is Int for try await x: Int in mySeqStream { } "✅"