Skip to content

Instantly share code, notes, and snippets.

@groue
Last active January 22, 2025 20:01

Revisions

  1. groue revised this gist Jan 21, 2025. 1 changed file with 4 additions and 5 deletions.
    9 changes: 4 additions & 5 deletions Observation+Utils.swift
    Original file line number Diff line number Diff line change
    @@ -30,14 +30,13 @@ extension LockedValue: @unchecked Sendable where T: Sendable { }
    /// closure, and calls that closure again after properties have been
    /// modified, until the returned cancellable is cancelled.
    ///
    /// For example, the following code prints all changes to the name
    /// of players:
    /// For example, the following code prints all changes to the name of the
    /// observed player:
    ///
    /// ```swift
    /// let player: Player = ...
    /// let cancellable = observe {
    /// for player in players {
    /// print(player.name)
    /// }
    /// print(player.name)
    /// }
    /// ```
    ///
  2. groue revised this gist Jan 21, 2025. 1 changed file with 0 additions and 3 deletions.
    3 changes: 0 additions & 3 deletions Observation+Utils.swift
    Original file line number Diff line number Diff line change
    @@ -105,9 +105,6 @@ extension LockedValue: @unchecked Sendable where T: Sendable { }
    return
    }
    DispatchQueue.main.async {
    if cancelled.withLock({ $0 }) {
    return
    }
    observe(until: cancelled, apply)
    }
    }
  3. groue created this gist Jan 21, 2025.
    114 changes: 114 additions & 0 deletions Observation+Utils.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,114 @@
    import Combine
    import Dispatch
    import Foundation
    import Observation

    /// A LockedValue protects a value with an NSLock.
    private final class LockedValue<T> {
    private var value: T
    private var lock = NSLock()

    public init(_ value: T) {
    self.value = value
    }

    /// Runs the provided closure while holding a lock on the value.
    ///
    /// - parameter body: A closure that can modify the value.
    public func withLock<U>(_ body: (inout T) throws -> U) rethrows -> U {
    lock.lock()
    defer { lock.unlock() }
    return try body(&value)
    }
    }

    extension LockedValue: @unchecked Sendable where T: Sendable { }

    /// Tracks access to observable properties.
    ///
    /// This method tracks access to any observable property within the `apply`
    /// closure, and calls that closure again after properties have been
    /// modified, until the returned cancellable is cancelled.
    ///
    /// For example, the following code prints all changes to the name
    /// of players:
    ///
    /// ```swift
    /// let cancellable = observe {
    /// for player in players {
    /// print(player.name)
    /// }
    /// }
    /// ```
    ///
    /// - important: Modifications performed from the `apply` closure are
    /// not notified.
    ///
    /// - Parameter apply: A closure that contains properties to track.
    /// - Returns: A cancellable that stops the observation.
    @MainActor public func observe(_ apply: @escaping @MainActor () -> Void) -> AnyCancellable {
    let cancelled = LockedValue<Bool>(false)
    let cancellable = AnyCancellable {
    cancelled.withLock { $0 = true }
    }
    observe(until: cancelled, apply)
    return cancellable
    }

    /// Returns a stream of values built from observable properties.
    ///
    /// The returned stream immediately emits the result of the `value` closure,
    /// and tracks access to any observable property. It emits a new value after
    /// properties have been modified, until the iteration is ended.
    ///
    /// For example, the following code prints all changes to the name of the
    /// observed player:
    ///
    /// ```swift
    /// Task {
    /// let player: Player = ...
    /// let names = makeObservationStream { player.name }
    /// for await name in names {
    /// print(name)
    /// }
    /// }
    /// ```
    ///
    /// - important: Modifications performed from the `value` closure are
    /// not notified.
    ///
    /// - Parameter value: A closure that returns a value to track.
    /// - Returns: A stream of tracked values.
    @MainActor public func makeObservationStream<T>(_ value: @escaping @MainActor () -> sending T) -> AsyncStream<T> {
    let (stream, continuation) = AsyncStream.makeStream(of: T.self, bufferingPolicy: .bufferingNewest(1))
    let cancelled = LockedValue<Bool>(false)

    continuation.onTermination = { termination in
    cancelled.withLock { $0 = true }
    }

    observe(until: cancelled) {
    continuation.yield(value())
    }

    return stream
    }

    @MainActor private func observe(until cancelled: LockedValue<Bool>, _ apply: @escaping @MainActor () -> Void) {
    withObservationTracking {
    if cancelled.withLock({ $0 }) {
    return
    }
    apply()
    } onChange: {
    if cancelled.withLock({ $0 }) {
    return
    }
    DispatchQueue.main.async {
    if cancelled.withLock({ $0 }) {
    return
    }
    observe(until: cancelled, apply)
    }
    }
    }
    134 changes: 134 additions & 0 deletions ObservationTests.swift
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,134 @@
    import Combine
    import Dispatch
    import Observation
    import Testing

    @Observable private class Model {
    var value: Int

    init(value: Int) {
    self.value = value
    }
    }

    @MainActor @Suite struct ObservationTests {
    @Test func observationOfOneChange() async throws {
    await confirmation(expectedCount: 2) { modelIsObserved in
    var cancellable: AnyCancellable?

    await withCheckedContinuation { continuation in
    let model = Model(value: 0)

    cancellable = observe {
    modelIsObserved()

    if model.value == 1 {
    continuation.resume()
    }
    }

    // Perform the change
    model.value = 1
    }

    withExtendedLifetime(cancellable) { }
    }
    }

    @Test func observationOfTwoChanges() async throws {
    await confirmation(expectedCount: 3) { modelIsObserved in
    var cancellable: AnyCancellable?

    await withCheckedContinuation { continuation in
    let model = Model(value: 0)

    cancellable = observe {
    modelIsObserved()

    if model.value == 2 {
    continuation.resume()
    }
    }

    // Perform 1st change
    model.value = 1

    // Perform 2nd change
    DispatchQueue.main.async {
    model.value = 2
    }
    }

    withExtendedLifetime(cancellable) { }
    }
    }

    @Test func observationStream_no_change() async throws {
    await confirmation(expectedCount: 1) { modelIsObserved in
    let model = Model(value: 0)
    let stream = makeObservationStream {
    model.value
    }

    for await value in stream {
    modelIsObserved()

    switch value {
    case 0:
    return
    default:
    Issue.record("Unexpected value \(value)")
    return
    }
    }
    }
    }

    @Test func observationStream_one_change() async throws {
    await confirmation(expectedCount: 2) { modelIsObserved in
    let model = Model(value: 0)
    let stream = makeObservationStream {
    model.value
    }

    for await value in stream {
    modelIsObserved()

    switch value {
    case 0:
    model.value = 1
    case 1:
    return
    default:
    Issue.record("Unexpected value \(value)")
    return
    }
    }
    }
    }

    @Test func observationStream_two_changes() async throws {
    await confirmation(expectedCount: 3) { modelIsObserved in
    let model = Model(value: 0)
    let stream = makeObservationStream {
    model.value
    }

    for await value in stream {
    modelIsObserved()

    switch value {
    case 0:
    model.value = 1
    case 1:
    model.value = 2
    case 2:
    return
    default:
    Issue.record("Unexpected value \(value)")
    return
    }
    }
    }
    }
    }