Skip to content

Instantly share code, notes, and snippets.

@bgromov
Last active March 30, 2021 06:50
Show Gist options
  • Save bgromov/1cc84a62e5ac2363c4583e8b69ecac0a to your computer and use it in GitHub Desktop.
Save bgromov/1cc84a62e5ac2363c4583e8b69ecac0a to your computer and use it in GitHub Desktop.

Revisions

  1. bgromov revised this gist Jul 25, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion _README.md
    Original file line number Diff line number Diff line change
    @@ -54,7 +54,7 @@ let pub = (0..<10).publisher

    ### Credits

    The underlying [RingBuffer](https://github.com/raywenderlich/swift-algorithm-club/tree/dd1ed39fca150d4fa2905b902736f12a49f3efb1/Ring%20Buffer) structure used in this implementation is taken from Ray Wenderlich's Swift Algorithm Club written by Matthijs Hollemans.
    The underlying [RingBuffer](https://github.com/raywenderlich/swift-algorithm-club/tree/dd1ed39fca150d4fa2905b902736f12a49f3efb1/Ring%20Buffer) structure used in this implementation is taken from Ray Wenderlich's Swift Algorithm Club and is written by Matthijs Hollemans.

    ### Useful Links

  2. bgromov revised this gist Jul 25, 2020. 1 changed file with 2 additions and 2 deletions.
    4 changes: 2 additions & 2 deletions _README.md
    Original file line number Diff line number Diff line change
    @@ -1,4 +1,4 @@
    ### RingBuffer Publisher
    ## RingBuffer Publisher

    Implementation of a ring buffer publisher for Apple's Combine framework.

    @@ -56,7 +56,7 @@ let pub = (0..<10).publisher

    The underlying [RingBuffer](https://github.com/raywenderlich/swift-algorithm-club/tree/dd1ed39fca150d4fa2905b902736f12a49f3efb1/Ring%20Buffer) structure used in this implementation is taken from Ray Wenderlich's Swift Algorithm Club written by Matthijs Hollemans.

    ### Useful links
    ### Useful Links

    - [OpenCombine](https://github.com/OpenCombine) &mdash; Open-source implementation of Apple's Combine framework
    - [How to create custom Publisher in Combine](https://medium.com/flawless-app-stories/swift-combine-custom-publisher-6d1cc3dc248f)
  3. bgromov revised this gist Jul 25, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion _README.md
    Original file line number Diff line number Diff line change
    @@ -58,6 +58,6 @@ The underlying [RingBuffer](https://github.com/raywenderlich/swift-algorithm-clu

    ### Useful links

    - [OpenCombine](https://github.com/OpenCombine) --- Open-source implementation of Apple's Combine framework
    - [OpenCombine](https://github.com/OpenCombine) &mdash; Open-source implementation of Apple's Combine framework
    - [How to create custom Publisher in Combine](https://medium.com/flawless-app-stories/swift-combine-custom-publisher-6d1cc3dc248f)
    - [Extending Combine with a custom ShareReplay operator](https://www.onswiftwings.com/posts/share-replay-operator/)
  4. bgromov revised this gist Jul 25, 2020. 1 changed file with 6 additions and 2 deletions.
    8 changes: 6 additions & 2 deletions _README.md
    Original file line number Diff line number Diff line change
    @@ -4,7 +4,7 @@ Implementation of a ring buffer publisher for Apple's Combine framework.

    The publisher produces the first output based on a specified strategy (defaults to `.always`). See examples below.

    ### Example
    ### Examples

    ```swift
    import Foundation
    @@ -52,8 +52,12 @@ let pub = (0..<10).publisher
    [7, 8, 9]
    ```

    ### Credits

    The underlying [RingBuffer](https://github.com/raywenderlich/swift-algorithm-club/tree/dd1ed39fca150d4fa2905b902736f12a49f3efb1/Ring%20Buffer) structure used in this implementation is taken from Ray Wenderlich's Swift Algorithm Club written by Matthijs Hollemans.

    ### Useful links

    - [OpenCombine](https://github.com/OpenCombine) --- Open-source implementation of Apple's Combibe framework
    - [OpenCombine](https://github.com/OpenCombine) --- Open-source implementation of Apple's Combine framework
    - [How to create custom Publisher in Combine](https://medium.com/flawless-app-stories/swift-combine-custom-publisher-6d1cc3dc248f)
    - [Extending Combine with a custom ShareReplay operator](https://www.onswiftwings.com/posts/share-replay-operator/)
  5. bgromov revised this gist Jul 25, 2020. 1 changed file with 16 additions and 1 deletion.
    17 changes: 16 additions & 1 deletion _README.md
    Original file line number Diff line number Diff line change
    @@ -32,6 +32,15 @@ The above snippet produces the following output:

    Alternatively, when strategy is set to `.whenFull`:

    ```swift
    import Foundation
    import Combine

    let pub = (0..<10).publisher
    .ringBuffer(size: 3, strategy: .whenFull)
    .sink { print($0) }
    ```

    ```
    [0, 1, 2]
    [1, 2, 3]
    @@ -41,4 +50,10 @@ Alternatively, when strategy is set to `.whenFull`:
    [5, 6, 7]
    [6, 7, 8]
    [7, 8, 9]
    ```
    ```

    ### Useful links

    - [OpenCombine](https://github.com/OpenCombine) --- Open-source implementation of Apple's Combibe framework
    - [How to create custom Publisher in Combine](https://medium.com/flawless-app-stories/swift-combine-custom-publisher-6d1cc3dc248f)
    - [Extending Combine with a custom ShareReplay operator](https://www.onswiftwings.com/posts/share-replay-operator/)
  6. bgromov created this gist Jul 25, 2020.
    86 changes: 86 additions & 0 deletions RingBuffer.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,86 @@
    /*
    Source: https://github.com/raywenderlich/swift-algorithm-club/tree/dd1ed39fca150d4fa2905b902736f12a49f3efb1/Ring%20Buffer

    Fixed-length ring buffer
    In this implementation, the read and write pointers always increment and
    never wrap around. On a 64-bit platform that should not get you into trouble
    any time soon.
    Not thread-safe, so don't read and write from different threads at the same
    time! To make this thread-safe for one reader and one writer, it should be
    enough to change read/writeIndex += 1 to OSAtomicIncrement64(), but I haven't
    tested this...
    */

    public struct RingBuffer<T> {
    private var array: [T?]
    private var readIndex = 0
    private var writeIndex = 0

    public let size: Int

    public init(count: Int) {
    size = count
    array = [T?](repeating: nil, count: count)
    }

    /* Returns false if out of space. */
    @discardableResult
    public mutating func write(_ element: T) -> Bool {
    guard !isFull else { return false }
    defer {
    writeIndex += 1
    }
    array[wrapped: writeIndex] = element
    return true
    }

    /* Returns nil if the buffer is empty. */
    public mutating func read() -> T? {
    guard !isEmpty else { return nil }
    defer {
    array[wrapped: readIndex] = nil
    readIndex += 1
    }
    return array[wrapped: readIndex]
    }

    public var availableSpaceForReading: Int {
    return writeIndex - readIndex
    }

    public var isEmpty: Bool {
    return availableSpaceForReading == 0
    }

    public var availableSpaceForWriting: Int {
    return array.count - availableSpaceForReading
    }

    public var isFull: Bool {
    return availableSpaceForWriting == 0
    }
    }

    extension RingBuffer: Sequence {
    public func makeIterator() -> AnyIterator<T> {
    var index = readIndex
    return AnyIterator {
    guard index < self.writeIndex else { return nil }
    defer {
    index += 1
    }
    return self.array[wrapped: index]
    }
    }
    }

    private extension Array {
    subscript (wrapped index: Int) -> Element {
    get {
    return self[index % count]
    }
    set {
    self[index % count] = newValue
    }
    }
    }
    100 changes: 100 additions & 0 deletions RingBufferPublisher.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,100 @@
    import Foundation
    import Combine

    public extension Publisher {
    /// A publisher that buffers elements from an upstream publisher in a ring buffer.
    /// - Parameters:
    /// - size: buffer size
    /// - strategy: when set to `.always` (default) generates output with first upstream element; when set to `.whenFull` generates output after the buffer is full
    func ringBuffer(size: Int, strategy: Publishers.RingBuffer<Self>.OutputStrategy = .always) -> Publishers.RingBuffer<Self> {
    return Publishers.RingBuffer(upstream: self, size: size, strategy: strategy)
    }
    }

    public extension Publishers {
    /// A publisher that buffers elements from an upstream publisher in a ring buffer.
    struct RingBuffer<Upstream> : Publisher where Upstream : Publisher {
    /// The kind of values published by this publisher.
    public typealias Output = [Upstream.Output]

    /// The kind of errors this publisher might publish.
    ///
    /// Use `Never` if this `Publisher` does not publish errors.
    public typealias Failure = Upstream.Failure

    /// The publisher from which this publisher receives elements.
    public let upstream: Upstream

    /// The maximum number of elements to store.
    public let size: Int

    /// Output strategy
    public let strategy: OutputStrategy

    public init(upstream: Upstream, size: Int, strategy: OutputStrategy) {
    self.upstream = upstream
    self.size = size
    self.strategy = strategy
    }

    public func receive<Downstream: Subscriber>(subscriber: Downstream)
    where Downstream.Input == Output, Downstream.Failure == Failure
    {
    upstream.subscribe(Inner(downstream: subscriber, size: size, strategy: strategy))
    }
    }
    }

    public extension Publishers.RingBuffer {
    enum OutputStrategy {
    case always
    case whenFull
    }
    }

    extension Publishers.RingBuffer {
    private final class Inner<Downstream: Subscriber> : Subscriber where Downstream.Input == Output, Downstream.Failure == Upstream.Failure {

    typealias Input = Upstream.Output
    typealias Failure = Upstream.Failure

    let downstream: Downstream
    let strategy: OutputStrategy
    var buf: RingBuffer<Input>

    init(downstream: Downstream, size: Int, strategy: OutputStrategy) {
    self.downstream = downstream
    self.strategy = strategy
    self.buf = RingBuffer<Input>(count: size)
    }

    func receive(subscription: Subscription) {
    downstream.receive(subscription: subscription)
    }

    func receive(_ input: Upstream.Output) -> Subscribers.Demand {
    buf.write(input)

    let output = Array(buf)

    if strategy == .always {
    _ = downstream.receive(output)
    }

    if buf.isFull {
    if strategy == .whenFull {
    _ = downstream.receive(output)
    }

    // Drop oldest
    _ = buf.read()
    }

    return .max(buf.availableSpaceForWriting)
    }

    func receive(completion: Subscribers.Completion<Upstream.Failure>) {
    downstream.receive(completion: completion)
    }
    }
    }
    44 changes: 44 additions & 0 deletions _README.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,44 @@
    ### RingBuffer Publisher

    Implementation of a ring buffer publisher for Apple's Combine framework.

    The publisher produces the first output based on a specified strategy (defaults to `.always`). See examples below.

    ### Example

    ```swift
    import Foundation
    import Combine

    let pub = (0..<10).publisher
    .ringBuffer(size: 3)
    .sink { print($0) }
    ```

    The above snippet produces the following output:

    ```
    [0]
    [0, 1]
    [0, 1, 2]
    [1, 2, 3]
    [2, 3, 4]
    [3, 4, 5]
    [4, 5, 6]
    [5, 6, 7]
    [6, 7, 8]
    [7, 8, 9]
    ```

    Alternatively, when strategy is set to `.whenFull`:

    ```
    [0, 1, 2]
    [1, 2, 3]
    [2, 3, 4]
    [3, 4, 5]
    [4, 5, 6]
    [5, 6, 7]
    [6, 7, 8]
    [7, 8, 9]
    ```