Created
March 16, 2025 14:11
-
-
Save thomsmed/4b12820f22d70dbb0367e64b5b12860a to your computer and use it in GitHub Desktop.
A utility type for letting multiple caller Tasks wait for a future value produced by an asynchronous operation.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// PendingOperation.swift | |
// | |
import Foundation | |
/// A utility type for letting multiple caller Tasks wait for a future value produced by an asynchronous operation. | |
public final class PendingOperation<T: Sendable>: @unchecked Sendable { | |
private let lock = NSLock() | |
private var result: Result<T, any Error>? | |
private var operationTask: Task<Void, any Error>? | |
private var waitingContinuations: [UUID: CheckedContinuation<T, any Error>] = [:] | |
public init(_ operation: @escaping @Sendable () async throws -> T) { | |
operationTask = .detached { | |
do { | |
let value = try await operation() | |
self.resolve(with: .success(value)) | |
} catch { | |
self.resolve(with: .failure(error)) | |
} | |
} | |
} | |
deinit { | |
assert(waitingContinuations.isEmpty, "Expecting all pending continuations to be fulfilled") | |
} | |
/// Resolve this ``PendingOperation`` by returning `result`. Waiting Tasks will be notified immediately. | |
private func resolve(with result: Result<T, any Error>) { | |
let continuations = lock.withLock { | |
if self.result != nil { | |
// Should only be allowed to resolve once. | |
return [CheckedContinuation<T, any Error>]() | |
} | |
self.result = result | |
let continuations = self.waitingContinuations.map { $1 } | |
self.waitingContinuations.removeAll() | |
return continuations | |
} | |
// Must not resume the continuations while holding the lock! | |
for continuation in continuations { | |
continuation.resume(with: result) | |
} | |
} | |
public var isCompleted: Bool { | |
lock.withLock { | |
self.result != nil | |
} | |
} | |
public var isCanceled: Bool { | |
lock.withLock { | |
self.operationTask?.isCancelled ?? false | |
} | |
} | |
/// Wait for the future value. Task cancelation is handled as expected. | |
public func value() async throws -> T { | |
let id = UUID() | |
return try await withTaskCancellationHandler { | |
try await withCheckedThrowingContinuation { continuation in | |
do { | |
let readyResult: Result<T, any Error>? = try lock.withLock { | |
try Task.checkCancellation() | |
if let readyResult = self.result { | |
return readyResult | |
} else { | |
self.waitingContinuations[id] = continuation | |
return nil | |
} | |
} | |
if let readyResult { | |
// Must not resume the continuation while holding the lock! | |
continuation.resume(with: readyResult) | |
} | |
} catch { | |
continuation.resume(throwing: error) | |
} | |
} | |
} onCancel: { | |
let continuation = lock.withLock { | |
self.waitingContinuations.removeValue(forKey: id) | |
} | |
// Must not resume the continuation while holding the lock! | |
continuation?.resume(throwing: CancellationError()) | |
} | |
} | |
/// Cancel this ``PendingOperation``. | |
public func cancel() { | |
lock.withLock { | |
self.operationTask?.cancel() | |
} | |
} | |
} | |
// MARK: Usage/Tests | |
import Testing | |
struct PendingOperationTests { | |
@Test func test_instantCancelation() async throws { | |
let pendingOperation = PendingOperation<Void> { | |
try await Task.sleep(for: .seconds(1)) | |
} | |
#expect(!pendingOperation.isCompleted) | |
pendingOperation.cancel() | |
#expect(pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
await #expect(throws: CancellationError.self) { | |
_ = try await pendingOperation.value() | |
} | |
#expect(pendingOperation.isCompleted) | |
} | |
@Test func test_cancelation() async throws { | |
let pendingOperation = PendingOperation<Void> { | |
try await Task.sleep(for: .seconds(5)) | |
} | |
#expect(!pendingOperation.isCompleted) | |
try await Task.sleep(for: .seconds(1)) | |
pendingOperation.cancel() | |
#expect(pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
await #expect(throws: CancellationError.self) { | |
_ = try await pendingOperation.value() | |
} | |
#expect(pendingOperation.isCompleted) | |
} | |
@Test func test_cancelationAfterCompleted() async throws { | |
let pendingOperation = PendingOperation<Void> { | |
try Task.checkCancellation() | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
try await Task.sleep(for: .seconds(1)) | |
#expect(!pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
pendingOperation.cancel() // Canceling has no effect after the PendingOperation has completed | |
#expect(pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
_ = try await pendingOperation.value() | |
#expect(pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
} | |
@Test func test_multipleCancelationAfterCompleted() async throws { | |
let pendingOperation = PendingOperation<Void> { | |
try Task.checkCancellation() | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
try await Task.sleep(for: .seconds(1)) | |
pendingOperation.cancel() // Canceling has no effect after the PendingOperation has completed | |
pendingOperation.cancel() | |
pendingOperation.cancel() | |
#expect(pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
_ = try await pendingOperation.value() | |
#expect(pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
} | |
@Test func test_immediateCancelationOnCancelationIgnoringOperation() async throws { | |
let pendingOperation = PendingOperation<Void> { | |
try? Task.checkCancellation() // Operation ignores cancelation | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
pendingOperation.cancel() | |
#expect(pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
_ = try await pendingOperation.value() | |
#expect(pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
} | |
@Test func test_immediateCompletion() async throws { | |
let pendingOperation = PendingOperation<Void> { | |
try Task.checkCancellation() | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
_ = try await pendingOperation.value() | |
#expect(!pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
} | |
@Test func test_immediateCompletionReturningString() async throws { | |
let pendingOperation = PendingOperation<String> { | |
"Hello World!" | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
let value = try await pendingOperation.value() | |
#expect(!pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
#expect(value == "Hello World!") | |
} | |
@Test func test_resolveReturningString() async throws { | |
let pendingOperation = PendingOperation<String> { | |
"Hello World!" | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
try await Task.sleep(for: .seconds(1)) | |
#expect(!pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
let value = try await pendingOperation.value() | |
#expect(!pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
#expect(value == "Hello World!") | |
} | |
@Test func test_immediateCompletionThrowing() async throws { | |
struct MyError: Error {} | |
let pendingOperation = PendingOperation<String> { | |
throw MyError() | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
await #expect(throws: MyError.self) { | |
_ = try await pendingOperation.value() | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
} | |
@Test func test_resolveThrowing() async throws { | |
struct MyError: Error {} | |
let pendingOperation = PendingOperation<String> { | |
throw MyError() | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
try await Task.sleep(for: .seconds(1)) | |
#expect(!pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
await #expect(throws: MyError.self) { | |
_ = try await pendingOperation.value() | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
} | |
@Test func test_parallelThrowing() async throws { | |
struct MyError: Error {} | |
let pendingOperation = PendingOperation<String> { | |
throw MyError() | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(!pendingOperation.isCompleted) | |
await #expect(throws: MyError.self) { | |
try await withThrowingTaskGroup(of: Void.self) { group in | |
group.addTask { | |
_ = try await pendingOperation.value() | |
} | |
group.addTask { | |
_ = try await pendingOperation.value() | |
} | |
group.addTask { | |
_ = try await pendingOperation.value() | |
} | |
try await group.waitForAll() | |
} | |
} | |
#expect(!pendingOperation.isCanceled) | |
#expect(pendingOperation.isCompleted) | |
} | |
@Test func test_parallelReturningString() async throws { | |
struct MyError: Error {} | |
let pendingOperation = PendingOperation<String> { | |
"Hello World!" | |
} | |
try await withThrowingTaskGroup(of: Void.self) { group in | |
group.addTask { | |
let value = try await pendingOperation.value() | |
#expect(value == "Hello World!") | |
} | |
group.addTask { | |
let value = try await pendingOperation.value() | |
#expect(value == "Hello World!") | |
} | |
group.addTask { | |
let value = try await pendingOperation.value() | |
#expect(value == "Hello World!") | |
} | |
try await group.waitForAll() | |
} | |
} | |
@Test func test_parallelDelayedReturningString() async throws { | |
struct MyError: Error {} | |
let pendingOperation = PendingOperation<String> { | |
try await Task.sleep(for: .seconds(1)) | |
return "Hello World!" | |
} | |
try await withThrowingTaskGroup(of: Void.self) { group in | |
group.addTask { | |
let value = try await pendingOperation.value() | |
#expect(value == "Hello World!") | |
} | |
group.addTask { | |
let value = try await pendingOperation.value() | |
#expect(value == "Hello World!") | |
} | |
group.addTask { | |
let value = try await pendingOperation.value() | |
#expect(value == "Hello World!") | |
} | |
try await group.waitForAll() | |
} | |
} | |
@Test func test_parallelWaitingForString() async throws { | |
struct MyError: Error {} | |
let pendingOperation = PendingOperation<String> { | |
"Hello World!" | |
} | |
async let values = withThrowingTaskGroup(of: String.self, returning: [String].self) { group in | |
group.addTask { | |
try await pendingOperation.value() | |
} | |
group.addTask { | |
try await pendingOperation.value() | |
} | |
var values: [String] = [] | |
for try await value in group { | |
values.append(value) | |
} | |
return values | |
} | |
#expect(try await values == ["Hello World!", "Hello World!"]) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment