-
-
Save macguru/9b9b42fabcd72c1613b060bb12ee0344 to your computer and use it in GitHub Desktop.
| import Synchronization | |
| func untilCancelled() async { | |
| print("1") | |
| let mutex: Mutex<CheckedContinuation<Void, Never>?> = .init(nil) | |
| await withTaskCancellationHandler { | |
| print("2") | |
| await withCheckedContinuation { continuation in | |
| print("3") | |
| mutex.withLock { | |
| if Task.isCancelled { | |
| print("abort") | |
| continuation.resume() | |
| return | |
| } | |
| print("4") | |
| $0 = continuation | |
| } | |
| } | |
| } onCancel: { | |
| let continuation = mutex.withLock { | |
| print("5") | |
| return $0.take() | |
| } | |
| continuation?.resume() | |
| } | |
| print("6") | |
| } | |
| func main() async throws { | |
| let hangsForever = Task { | |
| await untilCancelled() | |
| } | |
| // Comment out this line to test early cancellation. | |
| try await Task.sleep(for: .milliseconds(10)) | |
| print("will cancel") | |
| hangsForever.cancel() | |
| print("did cancel") | |
| try await Task.sleep(for: .milliseconds(10)) | |
| } | |
| try await main() |
Very interesting stuff, thanks for sharing. Cancellation is such a pain!
You got me curious about the necessity of the lock. (Yes, it is necessary).
func untilCancelledWithoutMutex() async {
print("1")
nonisolated(unsafe) var continuation: CheckedContinuation<Void, Never>?
// because of the isolated parameter, it is guaranteed there is no suspension between 1-2
await withTaskCancellationHandler {
print("2")
// again, same thing, guaranteed no suspension between 2-3
await withCheckedContinuation {
print("3")
if Task.isCancelled {
print("abort")
$0.resume()
return
}
// we actually make it all the way here without ever suspending, guaranteed.
print("4")
// So, you might be tempted, as I was, to wonder if the Mutex is even necessary.
// But there is a problem! Because the `withTaskCancellationHandler` function captures
// the `onCancel` function *first*, so it is callable even while the operation block is executing.
continuation = $0
}
} onCancel: {
print("5")
continuation?.resume()
}
print("6")
}It's also very hard to build a wrapper around withTaskCancellationHandler that does this, because for reasons I do not understand yet, it does not return a sending result like withCheckedContinuation does.
Yes, the concurrency part is really tricky.
In that way it's also really important to be lean on the presence of a continuation in the onCancel for other scenarios. Say the continuation does actually represent some completion status and some other Task/thread notifies it, maybe like so:
let continuation = mutex.withLock {
return $0.take()
}
continuation?.resume()Then the originally suspended Task will be unblocked but not yet continuing execution. The continuation will have been successfully resumed but the Task can still be cancelled.
await withTaskCancellationHandler {
await withCheckedContinuation { ... }
// We'll be waiting for further execution HERE; but the cancellation handler is not yet dismissed
} onCancel: { … }I'm sorry - being slow here. I don't fully understand but I really want to. What you mean by "not yet dismissed"?
The cancellation handler is registered as long as withTaskCancellationHandler's operation closure has not completed. So even if the checked continuation is effectively done, by the continuation being resumed, the cancellation handler is still in place. If we do not consume the continuation effectively in a mutex in all places that use it, we might get over-resumption.
Trying to make focussed example:
// INFO: Ignoring ~Copyable here, for brevity
let mutex: Mutex<CheckedContinuation<Void, Never>?> = .init(nil)
let task1 = Task {
await withTaskCancellationHandler {
await withCheckedContinuation { continuation in
mutex.withLock { $0 = continuation }
}
} onCancel: {
// If we can guarantee that task1 is never cancelled before being suspended, we could, theoretically force-unwrap the continuation here. But task2 causes cancellation after the continuation has been resumed. And so we get an error:
// ERROR: Fatal error: Unexpectedly found nil while unwrapping an Optional value
mutex.withLock { $0.take() }!.resume()
}
}
let task2 = Task {
try await Task.sleep(for: .milliseconds(10))
// SMELLS but works: We know our code, so we can assume the continuation to be in place already. Thus, we can force-unwrap the continuation here and resume it
mutex.withLock { $0.take() }!.resume()
// This will (very likely) cause task1 to be cancelled while it's still suspended inside withTaskCancellationHandler.
task1.cancel()
}
await task1.valueWhat I'm trying to say is that treating the continuation as optional in the cancellation handler (and not asserting it's always there), is required for two scenarios:
- Cancellation before the task has suspended on the continuation
- Cancellation after the task has been resumed
Ahh right of course! Yes, "continuation juggling" as I like to call it, is just the worse and so difficult to get right.
hi there, loved the blogpost, it was very helpful.
One question: Instead of doing this inside of mutex.withLock:
if Task.isCancelled {
print("abort")
continuation.resume()
return
}could you replace print("2") with try Task.checkCancellation()?
Edit: derp never mind saw this in your post:
Note how we do the cancellation check only inside the mutex access (3). This avoids race conditions where cancellation might happen concurrently with the continuation setup. When checking before withCheckedContinuation (1) or before withLock (2), in rare cases cancellation might happen right between the check and the mutex access, exposing the same lost cancellation behavior as before. It’s rare, but not impossible.
One other thing I noticed was that the code above that I've copied below:
await withTaskCancellationHandler {
print("2")
await withCheckedContinuation { continuation in
print("3")
mutex.withLock {
if Task.isCancelled {
print("abort")
continuation.resume()
return // THIS RETURN
}
print("4")
$0 = continuation
}
// Some code outside the mutex.withLock {} will still return after the marked return above
}
} onCancel: {
let continuation = mutex.withLock {
print("5")
return $0.take()
}
continuation?.resume()
}If we hit the "early return" marked with // THIS RETURN we'll still run any code that is outside the mutex.withLock {}.
@Noobish1 Glad you liked it. 😊
Yes, it's right, code inside the in the checked continuation setup closure will still be run, the return is just exiting the mutex. Tho I'd argue there is usually little to do there if you have this kind of setup.
Regarding the first question, about using try Task.checkCancellation() instead of the manual abort. This is not possible for two reasons: First, the closure to set up a continuation is non-throwing. Even if mutex.withLock supports rethrowing, it could not inside withCheckedContinuation. Second, at the point where withCheckedContinuation is being called, the continuation is already in place and the enclosing task will be suspended. Even if we could, throwing at that point would leak the continuation and keep the caller suspended forever.
@macguru ah ok. I had overlooked that your whole async function was non-throwing.
@Noobish1 even if it was, you could not throw there.
The body of even withCheckedThrowingContinuation is always non-throwing. It's defined as _ body: (CheckedContinuation<T, any Error>) -> Void.
I guess this makes sense, also keeps things simpler and the semantics cleaner. There already is a way to throw out of this continuation by calling .resume(throwing: CancellationError()). Also when trowing an error there that resumes the continuation, you'd need to make sure the continuation isn't already stored somewhere else and might be over-resumed. Just too many pitfalls, I guess.
@macguru yup that makes total sense, you wouldn't want to throw inside the continuation as you said, you are given one way of doing that.
I was thinking of this for a throwing version:
func untilCancelled() async throws {
print("1")
let mutex: Mutex<CheckedContinuation<Void, Error>?> = .init(nil)
try await withTaskCancellationHandler {
print("2")
try Task.checkCancellation() // Note this cancellation check here
try await withCheckedThrowingContinuation { continuation in
print("3")
mutex.withLock {
print("4")
$0 = continuation
}
doSomeAsyncThing { result in
let continuation = mutex.withLock { $0.take() }
continuation?.resume(with: result)
}
}
} onCancel: {
let continuation = mutex.withLock {
print("5")
return $0.take()
}
continuation?.resume(throwing: CancellationError())
}
print("6")
}@mattmassicotte / @macguru is this an abomination? xD
func cancellableContinuation<T: Sendable>(_ perform: (@escaping @Sendable (Result<T, Error & Sendable>) -> Void) -> Void) async throws -> T {
let continuationRef: Mutex<CheckedContinuation<T, Error>?> = .init(nil)
return try await withTaskCancellationHandler {
try Task.checkCancellation()
return try await withCheckedThrowingContinuation { continuation in
continuationRef.withLock {
$0 = continuation
}
perform({ result in
let continuation = continuationRef.withLock { $0.take() }
continuation?.resume(with: result)
})
}
} onCancel: {
let continuation = continuationRef.withLock { $0.take() }
continuation?.resume(throwing: CancellationError())
}
}I liked the article too, it is very insightful to be honest. I was actually thinking about creating something like what @Noobish1 created to handle the cancellation all over the app I am working on. But I wanted to ask a small question a found a small solution on swift forums that makes use share properties created in the function between the operation closure and the cancellation closure by doing so:
let onCancel = { dataTask?.cancel() }
return try await withTaskCancellationHandler {
// ..........
} onCancel: {
onCancel()
}And what about replacing mutex with this legacy mutex for app's that supports iOS 16+:
final class LegacyMutex<Wrapped>: @unchecked Sendable {
private var mutex = pthread_mutex_t()
private var wrapped: Wrapped
init(_ initialValue: Wrapped) {
pthread_mutex_init(&mutex, nil)
self.wrapped = initialValue
}
deinit {
pthread_mutex_destroy(&mutex)
}
func withLock<R>(_ body: @Sendable (inout Wrapped) throws -> R) rethrows -> R {
pthread_mutex_lock(&mutex); defer { pthread_mutex_unlock(&mutex) }
return try body(&wrapped)
}
}I think OSAllocatedUnfairLock is a great option if you cannot use Mutex. It's not as safe, but pretty good.
I'm also not 100% sure about that withLock implementation. I think you may need either R: Sendable or maybe a sending in there.
I think you may need either
R: Sendableor maybe asendingin there.
why should I do this ?
This is locking pattern known to be easy to use wrong in subtle ways. It can be used safely, but is very easy to use wrong. The core problem is as implemented LegacyMutex actually isn't fully Sendable. Here's an example:
class NonSendable {}
struct UsesLegacyMutex: Sendable {
private let locked = LegacyMutex(NonSendable())
var thisIsNotActuallySafe: NonSendable {
locked.withLock { $0 }
}
}@EngOmarElsayed we’ve been using this package for a legacy mutex: https://github.com/swhitty/swift-mutex
@Noobish1 About your example… I'm wondering what the perform() would be doing. Because for anything "heavy" you'd probably want an asynchronous context, so you'd need to make a task there. And then, you could leave out the continuation handling entirely and just wrap what you're doing in a cancellation handler.
@EngOmarElsayed I'm not entirely sure what you mean in the reference to the forum post you liked. But if I understand you correctly, then in those cases where you have a Task, you don't need any of the dance here. I talked about this in my post under The simple example.
@macguru perform would be some API that uses closure callbacks that can't be easily converted to async/await.
var thisIsNotActuallySafe: NonSendable { locked.withLock { $0 } }
But why this isn't safe, I think there is something I am missing but what is it 😅 ?
I'm not entirely sure what you mean in the reference to the forum post you liked. But if I understand you correctly, then in those cases where you have a
Task, you don't need any of the dance here. I talked about this in my post under The simple example.
You are right, I was sharing something I found with you 😅
A slightly expanded version of the sample code built in my Task Cancellation Handlers post. Paste it into a Swift command line tool (Playgrounds crashed for me). Commenting out line 39 will toggle between the two cancellation cases: early cancellation and regular (late) cancellation.
The regular case will print the following:
Early cancellation (task cancelled before the handler is registered) will never reach
4and5, but instead printabort: