Skip to content

Instantly share code, notes, and snippets.

@simme
Created December 6, 2022 08:33
Show Gist options
  • Save simme/3d790dbb89e5bbcf74d7603f509ecda5 to your computer and use it in GitHub Desktop.
Save simme/3d790dbb89e5bbcf74d7603f509ecda5 to your computer and use it in GitHub Desktop.

Revisions

  1. simme created this gist Dec 6, 2022.
    246 changes: 246 additions & 0 deletions FetchRecordZoneChangesPublisher.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,246 @@
    import CloudKit
    import Combine

    /// A publisher that wraps a `CKFetchRecordZoneChangesOperation` and emits events as the operation completes.
    ///
    /// The `FetchRecordZoneChangesPublisher` fetches changes from the given record zones. New and deleted records are
    /// posted individually via the `.recordChanged` and `.recordDeleted` actions.
    ///
    /// Errors are automatically retried if possible. Resetting the change token in case it expired is also automatically
    /// handled. Because emitting errors fails a publisher all errors are posted as actions. There may still be running
    /// operations, even if an error occurs.
    ///
    /// The publisher emits a completion event once all running operations have finished.
    struct FetchRecordZoneChangesPublisher: Publisher {

    // MARK: Types

    /// The events emitted by the publisher.
    enum Action {
    /// Posted when the record zone change token is updated.
    case recordZoneChangeTokenUpdated(zoneID: CKRecordZone.ID, token: CKServerChangeToken?, clientToken: Data?)

    /// Posted when an updated record is received from the server.
    case recordChanged(CKRecord)

    /// Posted when a record has been deleted on the server.
    case recordDeleted(CKRecord.ID, CKRecord.RecordType)

    /// Posted when the fetch is complete for a specific zone.
    case zoneFetchComplete(CKRecordZone.ID, CKServerChangeToken?, Data?, Bool)

    /// An error occured and changes for the associated zone will be fetched again.
    case retryingZone(CKRecordZone.ID)

    /// An unrecoverable error occured.
    case unrecoverableError(CKRecordZone.ID?, Error)
    }

    // MARK: Properties

    /// The CloudKit database to perform the operatoin on.
    private let database: CKDatabase

    /// A queue for the operation.
    private let queue: OperationQueue

    /// A dictionary of zones and their respective change tokens.
    private let zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]

    // MARK: Initialization

    /// Creates a new `FetchRecordZoneChangesPublisher`.
    ///
    /// - Parameter database: The cloud kit database to run the operation on.
    /// - Parameter queue: The operation queue responsible for executing the operation.
    /// - Parameter zoneTokens: A dictionary of zones and their respective change tokens.
    ///
    /// - Returns: A new `FetchRecordZoneChangesPublisher`.
    init(in database: CKDatabase, on queue: OperationQueue, zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]) {
    self.database = database
    self.queue = queue
    self.zoneTokens = zoneTokens
    }

    // MARK: Publisher Implementation

    func receive<S>(subscriber: S) where S: Subscriber, Never == S.Failure, Action == S.Input {
    let subscription = Subscription(
    subscriber: subscriber,
    zoneTokens: zoneTokens,
    in: database,
    on: queue
    )
    subscriber.receive(subscription: subscription)
    }

    typealias Output = Action
    typealias Failure = Never

    }

    // MARK: - Subscription

    private extension FetchRecordZoneChangesPublisher {

    /// The subscription wraps the actual operation execution and emits actions to its subscriber.
    final class Subscription<S: Subscriber> where S.Input == Output, S.Failure == Failure {

    // MARK: Properties

    /// The active subscriber receiving input, if any.
    private var subscriber: S?

    /// The operation queue to execute the operations on.
    private let queue: OperationQueue

    /// The cloud kit database to run the operations against.
    private let database: CKDatabase

    /// All in-flight operations.
    private var operations: [CKFetchRecordZoneChangesOperation] = []

    /// A list of zones and their respective tokens. May change as zones are retried.
    private var zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]

    /// `true` if all changes should be fetched. The consumer is responsible for kicking off more operations if
    /// necessary to collect all data.
    private let fetchAllChanges: Bool

    /// Internal state, makes sure we only start one operation initially.
    private var didStart: Bool = false

    // MARK: Initialization

    /// Creates a new `FetchRecordZoneChangesPublisher.Subscription`.
    ///
    /// - Parameter subscriber: The subscriber to notify.
    /// - Parameter zoneTkens: A list of zones and their respective tokens. May change as zones are retried.
    /// - Parameter fetchAllChanges: `true` if all changes should be fetched. The consumer is responsible for kicking
    /// off more operations if necessary to collect all data.
    /// - Parameter database: The CloudKit database to perform the operation on.
    /// - Parameter queue: An operation queue to run the operation.
    ///
    /// - Returns: A new `FetchRecordZoneChangesPublisher.Subscriber`.
    init(
    subscriber: S,
    zoneTokens: [CKRecordZone.ID: CKServerChangeToken?],
    fetchAllChanges: Bool = true,
    in database: CKDatabase,
    on queue: OperationQueue
    ) {
    self.subscriber = subscriber
    self.database = database
    self.zoneTokens = zoneTokens
    self.fetchAllChanges = fetchAllChanges
    self.queue = queue
    }

    /// Configures an operation and sets up the callbacks to send events.
    ///
    /// - Parameter zoneTokens: A list of zones and their respective tokens.
    ///
    /// - Returns: A new `CKFetchRecordZoneChangesOperation`.
    private func configureOperation(
    zoneTokens: [CKRecordZone.ID: CKServerChangeToken?]
    ) -> CKFetchRecordZoneChangesOperation {
    let configurations = Dictionary(uniqueKeysWithValues: zoneTokens.map { id, token in
    (id, CKFetchRecordZoneChangesOperation.ZoneConfiguration(
    previousServerChangeToken: token,
    resultsLimit: nil,
    desiredKeys: nil
    ))
    })

    let operation = CKFetchRecordZoneChangesOperation(
    recordZoneIDs: Array(zoneTokens.keys),
    configurationsByRecordZoneID: configurations
    )
    operation.fetchAllChanges = fetchAllChanges
    operation.database = database
    operation.qualityOfService = .userInitiated
    operation.recordZoneChangeTokensUpdatedBlock = { [weak self] zoneID, token, clientToken in
    self?.zoneTokens[zoneID] = token
    _ = self?.subscriber?.receive(.recordZoneChangeTokenUpdated(
    zoneID: zoneID,
    token: token,
    clientToken: clientToken
    ))
    }

    operation.recordChangedBlock = { [weak self] record in
    _ = self?.subscriber?.receive(.recordChanged(record))
    }

    operation.recordWithIDWasDeletedBlock = { [weak self] id, type in
    _ = self?.subscriber?.receive(.recordDeleted(id, type))
    }

    operation.recordZoneFetchCompletionBlock = { [weak self] zoneID, changeToken, clientToken, moreComing, error in
    if let error = error {
    self?.handleError(error, for: zoneID)
    } else {
    _ = self?.subscriber?.receive(.zoneFetchComplete(zoneID, changeToken, clientToken, moreComing))
    }
    }

    operation.fetchRecordZoneChangesCompletionBlock = { [weak self] error in
    guard let strongSelf = self else { return }
    _ = strongSelf.operations.firstIndex(of: operation).map { strongSelf.operations.remove(at: $0) }
    if let error = error {
    strongSelf.handleError(error, for: nil)
    } else {
    if strongSelf.operations.isEmpty {
    self?.subscriber?.receive(completion: .finished)
    }
    }
    }

    self.operations.append(operation)
    return operation
    }

    /// Handle CloudKit errors.
    ///
    /// - Parameter error: The error to handle.
    /// - Parameter zoneID: The ID of the zone in which the error occured.
    private func handleError(_ error: Error, for zoneID: CKRecordZone.ID?) {
    if error.isCloudKitTokenExpiredError, let zoneID = zoneID {
    _ = subscriber?.receive(.recordZoneChangeTokenUpdated(zoneID: zoneID, token: nil, clientToken: nil))
    _ = subscriber?.receive(.retryingZone(zoneID))
    let newOperation = configureOperation(zoneTokens: [zoneID: nil])
    queue.addOperation(newOperation)
    } else if let retryDelay = error.delayIfRetryPossible(), let zoneID = zoneID {
    queue.schedule(after: .init(Date() + retryDelay)) { [weak self] in
    let token = self?.zoneTokens[zoneID] ?? nil
    guard let newOperation = self?.configureOperation(zoneTokens: [zoneID: token]) else { return }
    self?.queue.addOperation(newOperation)
    }
    } else {
    _ = subscriber?.receive(.unrecoverableError(zoneID, error))
    }
    }
    }
    }

    // MARK: -

    extension FetchRecordZoneChangesPublisher.Subscription: Cancellable {
    func cancel() {
    subscriber = nil
    for operation in operations {
    operation.cancel()
    }
    }
    }

    extension FetchRecordZoneChangesPublisher.Subscription: Subscription {
    func request(_ demand: Subscribers.Demand) {
    guard subscriber != nil else { return }
    if demand > 0 && !didStart {
    let operation = configureOperation(zoneTokens: zoneTokens)
    queue.addOperation(operation)
    didStart = true
    }
    }
    }