Skip to content

Instantly share code, notes, and snippets.

@dehesa
Last active February 24, 2025 20:47
Show Gist options
  • Save dehesa/34939f84460a81caebf6d9da48c0ff10 to your computer and use it in GitHub Desktop.
Save dehesa/34939f84460a81caebf6d9da48c0ff10 to your computer and use it in GitHub Desktop.
Experimental mechanism to track progressions on Swift 6 concurrency.
/// Gathers progression information of all computations within the `operation` block.
///
/// This free function sets the `@TaskLocal` value `Task.unsafeProgress`, which serves as an
/// indicator to any task in the structure concurrency tree to start gathering progression information.
/// Functions supporting _progress_ will forward such data into the `progress` handler.
///
/// ## Features
/// - Progress data forwarding to the `progress` handler.
/// - No locks (and no thread hopping).
/// - Supports concurrent operations.
/// - Supports hierarchy of child operations (recursive tree).
/// - (Almost) zero cost when operations are not interested in progress information.
///
/// It is worth noting that progress information requires structured concurrency;
/// therefore any operation outside of it (e.g. `Task.detached`) won't be able to participate.
/// At least, without some manual labor. Structure concurrency does still support parallel operations
/// through _task groups_ and `async let`.
///
/// ## Usage
///
/// Call `withTaskProgression` when you are interested in progression information.
/// Any operation internally checking for `Task.unsafeProgress` can forward such information and
/// you will receive it in the `progress` handler.
///
/// ```swift
/// try await withTaskProgression { _ in
/// let data = try await downloadLargeImage()
/// return try await Image(data: data)
/// } progress: { progression in
/// switch progression.status {
/// case .running(let workUnits): print("Completed units of work:", workUnits.completed)
/// case .finished: print("Image has been successfully generated")
/// case .failed: print("Failed to process image")
/// }
/// }
/// ```
///
/// Not all functions will support progression natively; however, you can quickly retrofit them. For example:
///
/// ```swit
/// try await withTaskProgression { progression in
/// let (bytes, response) = try await URLSession.shared.bytes(from: url)
/// for await byte in bytes {
/// await progression.progressed()
/// }
/// } progress: {
/// guard case .running(let units) = $0 else { return }
/// print("Downloaded bytes:", units.completed)
/// }
/// ```
///
/// Task progression also supports hierarchical work and parallel sub-task.
/// This means, you can divide the work to be done between logical operations and
/// monitor the progress of each task (even when they are run concurrently).
///
/// ```swift
/// try await withTaskProgression { progression in
/// async let imageDataA = downloadImage(url: urlA)
/// async let imageDataB = downloadImage(url: urlB)
/// let (dataA, dataB) = try await (imageDataA, imageDataB)
/// } progress: {
/// print("Downloaded bytes in imageA:", $0.children[0].status.completed)
/// print("Downloaded bytes in imageB:", $0.children[1].status.completed)
/// }
/// ```
///
/// If you are implementing progression data forwarding, you should call one of the following
/// two methods of `UnsafeTaskProgress`:
/// - `progress(_:)` lets you specify how many _units of work_ have been completed.
/// - `addChild(totalUnits:operation:)` creates a logical child progress.
///
/// ```swift
/// func downloadImage(url: URL) async throws -> Data {
/// try await Task.$unsafeProgress.addChild { progression in
/// for await twoBytes in mySequence {
/// // Store bytes
/// progression.progress { $0.increment(by: 2) }
/// }
/// }
/// }
/// ```
///
/// - parameter totalUnits: Optional total number of _units of work_ that this operation will performed.
/// This is forwarded to the `progress` block and it doesn't have any real function. If you aren't sure
/// how much _work_ will be performed. Leave it at `nil`.
/// - parameter metadata: Any `Sendable` type identifying the _root_ logical task.
/// - parameter operation: The operations being monitored for progress data.
/// - parameter progress: Closure forwarding any change in progress data.
/// - parameter isolation: The actor in which the progress data will be synchronized and the `operation` block will start.
/// If `nil` (i.e. concurrent thread), a new simple actor is created to serialize progression data computation.
/// - returns: The result of the `operation` block.
/// - todo: Remove `Sendable` marker from `Success` (use `sending` instead).
/// - todo: Make `TaskProgress` `~Escapable` and only compute values if queried.
public func withTaskProgression<Success: Sendable, Failure: Error>(
totalUnits: Int? = nil,
metadata: TaskProgress.Metadata? = nil,
operation: @isolated(any) (UnsafeTaskProgress) async throws(Failure) -> Success, // TODO: make sending Success
progress: @Sendable (TaskProgress) -> Void,
isolation: isolated (any Actor)? = #isolation
) async throws(Failure) -> sending Success {
try await withoutActuallyEscaping(progress) { (forward) async throws(Failure) -> Success in
let rootProgress = UnsafeTaskProgress(totalUnits: totalUnits, metadata: metadata, isolation: isolation ?? UnsafeTaskProgress.Serializer())
rootProgress.handler = { [unowned rootProgress] in
let immutableProgress = TaskProgress(unsafe: rootProgress)
forward(immutableProgress)
}
do {
let result = try await Task.$unsafeProgress.withValue(rootProgress) {
try await operation(rootProgress)
}
await rootProgress.finished(parent: .none, isolation: rootProgress.isolation)
return result
} catch let error as Failure {
await rootProgress.failed(error: error, isolation: rootProgress.isolation)
throw error
} catch { fatalError() } // TODO: Remove when concrete thrown errors are better supported
}
}
// MARK: -
/// Immutable progress information.
///
/// As soon as this object is retrieved, the _progression state_ is read and
/// further progression is not serialized here.
public struct TaskProgress: Sendable {
/// The status of the task (i.e. `running`, `failed`, `finished`) and
/// the amount of units of work performed by the progression task.
///
/// The units of work tracked here are independent from the units of work tracked in the children tasks.
public let status: Status
/// All logical children tasks progressions (recursive).
///
/// If a child task hasn't been launched, it doesn't appear here.
public let children: [TaskProgress]
/// Optional data identifying this _logical_ task.
public let metadata: Metadata?
/// - precondition: Isolated to `UnsafeTaskProgress.isolation`.
fileprivate init(unsafe progress: UnsafeTaskProgress) {
progress.isolation.assertIsolated()
self.status = progress.status
self.children = progress.children.map { TaskProgress(unsafe: $0) }
self.metadata = progress.metadata
}
}
public extension TaskProgress {
/// Data type stored with the receiving ``TaskProgress``.
typealias Metadata = any Sendable
/// The state of the task gathering progress information.
enum Status: Sendable {
case running(TaskProgress.Units)
case finished
case failed(any Swift.Error)
}
/// The number of _units of work_ completed by the task and, optionally,
/// the expected amount of _units of work_.
struct Units: Sendable, Hashable, CustomDebugStringConvertible {
public var completed: Int
public var total: Int?
public init() {
self.completed = .zero
self.total = nil
}
public init(completed: Int, total: Int? = nil) {
self.completed = completed
self.total = total
}
}
}
public extension TaskProgress.Units {
var debugDescription: String {
var result = String(self.completed)
if let total {
result.append(" of \(total)")
}
return result
}
func increment() -> Self {
self.increment(by: 1)
}
func increment(by unit: Int) -> Self {
let completed = self.completed + unit
let total: Int? = self.total.map { Swift.max(completed, $0) }
return Self(completed: completed, total: total)
}
}
public extension Task where Success == Never, Failure == Never {
/// Unsafe handle to the ongoing task progression.
///
/// If `nil`, no one is interested in task progression information, and
/// therefore there is no need to waste CPU cycles gathering it.
@TaskLocal static var unsafeProgress: UnsafeTaskProgress?
}
public extension TaskLocal<UnsafeTaskProgress?> {
/// Creates a _logical_ task to track progression.
func addChild<Success: Sendable, Failure: Error>(
totalUnits: Int? = nil,
metadata: TaskProgress.Metadata? = nil,
operation: (UnsafeTaskProgress?) async throws(Failure) -> Success
) async throws(Failure) -> Success {
guard let parentProgress = self.get(),
let childProgress = await parentProgress.makeChild(totalUnits: totalUnits, metadata: metadata, isolation: parentProgress.isolation) else {
return try await operation(nil)
}
do {
let result = try await self.withValue(childProgress) {
try await operation(childProgress)
}
await childProgress.finished(parent: parentProgress, isolation: childProgress.isolation)
return result
} catch let error as Failure {
await childProgress.failed(error: error, isolation: childProgress.isolation)
throw error
} catch { fatalError() } // TODO: Remove when concrete thrown errors are better supported
}
}
// MARK: -
public final class UnsafeTaskProgress: @unchecked Sendable {
var status: TaskProgress.Status
var children: ContiguousArray<UnsafeTaskProgress> = []
let isolation: any Actor
let metadata: TaskProgress.Metadata?
var handler: Handler!
init(
totalUnits: Int?,
metadata: TaskProgress.Metadata?,
isolation: any Actor
) {
self.status = .running(TaskProgress.Units(completed: 0, total: totalUnits))
self.children = ContiguousArray()
self.isolation = isolation
self.metadata = metadata
}
typealias Handler = () -> Void
}
public extension UnsafeTaskProgress {
func progressed(isolation: isolated (any Actor)? = #isolation) async {
await self.progressed(isolation: isolation) { $0.increment() }
}
func progressed(
isolation: isolated (any Actor)? = #isolation,
_ tally: @Sendable (_ previously: TaskProgress.Units) -> TaskProgress.Units
) async {
guard self.isolation === isolation else {
return await self.progressed(isolation: self.isolation, tally)
}
self.isolation.assertIsolated()
guard case .running(let previousTally) = self.status else { return }
self.status = .running(tally(previousTally))
self.handler()
}
func signalHandler(isolation: isolated (any Actor)? = #isolation) async {
guard self.isolation === isolation else {
return await self.signalHandler(isolation: self.isolation)
}
self.isolation.assertIsolated()
self.handler()
}
}
internal extension UnsafeTaskProgress {
/// - precondition: Isolated to `UnsafeTaskProgress.isolation`.
func failed(error: any Swift.Error, isolation: isolated any Actor) {
self.isolation.preconditionIsolated()
guard case .running = self.status else { fatalError("Unexpected progression status (on failure)") }
self.status = .failed(error)
self.handler()
}
/// - precondition: Isolated to `UnsafeTaskProgress.isolation`.
func finished(parent: UnsafeTaskProgress?, isolation: isolated any Actor) {
self.isolation.preconditionIsolated()
guard case .running = self.status else { fatalError("Unexpected progression status (on successful termination)") }
self.status = .finished
if let parent, case .running(let units) = parent.status {
parent.status = .running(TaskProgress.Units(completed: units.completed + 1, total: units.total))
}
self.handler()
}
actor Serializer {}
}
fileprivate extension UnsafeTaskProgress {
/// - precondition: Isolated to `UnsafeTaskProgress.isolation`.
func makeChild(totalUnits: Int?, metadata: TaskProgress.Metadata?, isolation: isolated any Actor) -> UnsafeTaskProgress? {
self.isolation.preconditionIsolated()
guard case .running = self.status else { return nil }
let childProgress = UnsafeTaskProgress(totalUnits: totalUnits, metadata: metadata, isolation: self.isolation)
childProgress.handler = { [unowned self] in self.handler() }
self.children.append(childProgress)
return childProgress
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment