Skip to content

Instantly share code, notes, and snippets.

@thomsmed
Created March 16, 2025 14:11
Show Gist options
  • Save thomsmed/4b12820f22d70dbb0367e64b5b12860a to your computer and use it in GitHub Desktop.
Save thomsmed/4b12820f22d70dbb0367e64b5b12860a to your computer and use it in GitHub Desktop.
A utility type for letting multiple caller Tasks wait for a future value produced by an asynchronous operation.
//
// PendingOperation.swift
//
import Foundation
/// A utility type for letting multiple caller Tasks wait for a future value produced by an asynchronous operation.
public final class PendingOperation<T: Sendable>: @unchecked Sendable {
private let lock = NSLock()
private var result: Result<T, any Error>?
private var operationTask: Task<Void, any Error>?
private var waitingContinuations: [UUID: CheckedContinuation<T, any Error>] = [:]
public init(_ operation: @escaping @Sendable () async throws -> T) {
operationTask = .detached {
do {
let value = try await operation()
self.resolve(with: .success(value))
} catch {
self.resolve(with: .failure(error))
}
}
}
deinit {
assert(waitingContinuations.isEmpty, "Expecting all pending continuations to be fulfilled")
}
/// Resolve this ``PendingOperation`` by returning `result`. Waiting Tasks will be notified immediately.
private func resolve(with result: Result<T, any Error>) {
let continuations = lock.withLock {
if self.result != nil {
// Should only be allowed to resolve once.
return [CheckedContinuation<T, any Error>]()
}
self.result = result
let continuations = self.waitingContinuations.map { $1 }
self.waitingContinuations.removeAll()
return continuations
}
// Must not resume the continuations while holding the lock!
for continuation in continuations {
continuation.resume(with: result)
}
}
public var isCompleted: Bool {
lock.withLock {
self.result != nil
}
}
public var isCanceled: Bool {
lock.withLock {
self.operationTask?.isCancelled ?? false
}
}
/// Wait for the future value. Task cancelation is handled as expected.
public func value() async throws -> T {
let id = UUID()
return try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { continuation in
do {
let readyResult: Result<T, any Error>? = try lock.withLock {
try Task.checkCancellation()
if let readyResult = self.result {
return readyResult
} else {
self.waitingContinuations[id] = continuation
return nil
}
}
if let readyResult {
// Must not resume the continuation while holding the lock!
continuation.resume(with: readyResult)
}
} catch {
continuation.resume(throwing: error)
}
}
} onCancel: {
let continuation = lock.withLock {
self.waitingContinuations.removeValue(forKey: id)
}
// Must not resume the continuation while holding the lock!
continuation?.resume(throwing: CancellationError())
}
}
/// Cancel this ``PendingOperation``.
public func cancel() {
lock.withLock {
self.operationTask?.cancel()
}
}
}
// MARK: Usage/Tests
import Testing
struct PendingOperationTests {
@Test func test_instantCancelation() async throws {
let pendingOperation = PendingOperation<Void> {
try await Task.sleep(for: .seconds(1))
}
#expect(!pendingOperation.isCompleted)
pendingOperation.cancel()
#expect(pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
await #expect(throws: CancellationError.self) {
_ = try await pendingOperation.value()
}
#expect(pendingOperation.isCompleted)
}
@Test func test_cancelation() async throws {
let pendingOperation = PendingOperation<Void> {
try await Task.sleep(for: .seconds(5))
}
#expect(!pendingOperation.isCompleted)
try await Task.sleep(for: .seconds(1))
pendingOperation.cancel()
#expect(pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
await #expect(throws: CancellationError.self) {
_ = try await pendingOperation.value()
}
#expect(pendingOperation.isCompleted)
}
@Test func test_cancelationAfterCompleted() async throws {
let pendingOperation = PendingOperation<Void> {
try Task.checkCancellation()
}
#expect(!pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
try await Task.sleep(for: .seconds(1))
#expect(!pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
pendingOperation.cancel() // Canceling has no effect after the PendingOperation has completed
#expect(pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
_ = try await pendingOperation.value()
#expect(pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
}
@Test func test_multipleCancelationAfterCompleted() async throws {
let pendingOperation = PendingOperation<Void> {
try Task.checkCancellation()
}
#expect(!pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
try await Task.sleep(for: .seconds(1))
pendingOperation.cancel() // Canceling has no effect after the PendingOperation has completed
pendingOperation.cancel()
pendingOperation.cancel()
#expect(pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
_ = try await pendingOperation.value()
#expect(pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
}
@Test func test_immediateCancelationOnCancelationIgnoringOperation() async throws {
let pendingOperation = PendingOperation<Void> {
try? Task.checkCancellation() // Operation ignores cancelation
}
#expect(!pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
pendingOperation.cancel()
#expect(pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
_ = try await pendingOperation.value()
#expect(pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
}
@Test func test_immediateCompletion() async throws {
let pendingOperation = PendingOperation<Void> {
try Task.checkCancellation()
}
#expect(!pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
_ = try await pendingOperation.value()
#expect(!pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
}
@Test func test_immediateCompletionReturningString() async throws {
let pendingOperation = PendingOperation<String> {
"Hello World!"
}
#expect(!pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
let value = try await pendingOperation.value()
#expect(!pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
#expect(value == "Hello World!")
}
@Test func test_resolveReturningString() async throws {
let pendingOperation = PendingOperation<String> {
"Hello World!"
}
#expect(!pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
try await Task.sleep(for: .seconds(1))
#expect(!pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
let value = try await pendingOperation.value()
#expect(!pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
#expect(value == "Hello World!")
}
@Test func test_immediateCompletionThrowing() async throws {
struct MyError: Error {}
let pendingOperation = PendingOperation<String> {
throw MyError()
}
#expect(!pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
await #expect(throws: MyError.self) {
_ = try await pendingOperation.value()
}
#expect(!pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
}
@Test func test_resolveThrowing() async throws {
struct MyError: Error {}
let pendingOperation = PendingOperation<String> {
throw MyError()
}
#expect(!pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
try await Task.sleep(for: .seconds(1))
#expect(!pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
await #expect(throws: MyError.self) {
_ = try await pendingOperation.value()
}
#expect(!pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
}
@Test func test_parallelThrowing() async throws {
struct MyError: Error {}
let pendingOperation = PendingOperation<String> {
throw MyError()
}
#expect(!pendingOperation.isCanceled)
#expect(!pendingOperation.isCompleted)
await #expect(throws: MyError.self) {
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
_ = try await pendingOperation.value()
}
group.addTask {
_ = try await pendingOperation.value()
}
group.addTask {
_ = try await pendingOperation.value()
}
try await group.waitForAll()
}
}
#expect(!pendingOperation.isCanceled)
#expect(pendingOperation.isCompleted)
}
@Test func test_parallelReturningString() async throws {
struct MyError: Error {}
let pendingOperation = PendingOperation<String> {
"Hello World!"
}
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
let value = try await pendingOperation.value()
#expect(value == "Hello World!")
}
group.addTask {
let value = try await pendingOperation.value()
#expect(value == "Hello World!")
}
group.addTask {
let value = try await pendingOperation.value()
#expect(value == "Hello World!")
}
try await group.waitForAll()
}
}
@Test func test_parallelDelayedReturningString() async throws {
struct MyError: Error {}
let pendingOperation = PendingOperation<String> {
try await Task.sleep(for: .seconds(1))
return "Hello World!"
}
try await withThrowingTaskGroup(of: Void.self) { group in
group.addTask {
let value = try await pendingOperation.value()
#expect(value == "Hello World!")
}
group.addTask {
let value = try await pendingOperation.value()
#expect(value == "Hello World!")
}
group.addTask {
let value = try await pendingOperation.value()
#expect(value == "Hello World!")
}
try await group.waitForAll()
}
}
@Test func test_parallelWaitingForString() async throws {
struct MyError: Error {}
let pendingOperation = PendingOperation<String> {
"Hello World!"
}
async let values = withThrowingTaskGroup(of: String.self, returning: [String].self) { group in
group.addTask {
try await pendingOperation.value()
}
group.addTask {
try await pendingOperation.value()
}
var values: [String] = []
for try await value in group {
values.append(value)
}
return values
}
#expect(try await values == ["Hello World!", "Hello World!"])
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment