Skip to content

Instantly share code, notes, and snippets.

@miguelraz
Last active May 23, 2025 02:48
Show Gist options
  • Save miguelraz/645db0fc17ecc4abb0c7e89f7b4f4d11 to your computer and use it in GitHub Desktop.
Save miguelraz/645db0fc17ecc4abb0c7e89f7b4f4d11 to your computer and use it in GitHub Desktop.
use core::future::Future;
use core::pin::Pin;
use core::time::Duration;
use std::future::poll_fn;
use std::task::Poll;
pub async fn try_join<'a>(
futs: impl IntoIterator<Item = Pin<Box<dyn Future<Output = Result<(), &'a str>>>>>,
) -> Result<(), &'a str> {
// Define the iterator outside of the loop so we can repeatedly call `futs_iter.next()` without ownership issues
let futs_iter = futs.into_iter();
// Notice the `mut pinned_elem` here, that unlocks the `.as_mut()` API later.
for mut pinned_elem in futs_iter {
// Call the poll_fn method by "reaching" into the `cx`, since it's been pinned
poll_fn(|cx| match pinned_elem.as_mut().poll(cx) {
Poll::Ready(r) => Poll::Ready(r),
Poll::Pending => Poll::Pending,
})
.await?
// ^ .await? it at the end, since futures are lazy and we want to "drive" their forward progress.
}
Ok(())
}
pub async fn run_test() {
assert_eq!(
try_join([Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = _>>>,]).await,
Ok(())
);
assert_eq!(
try_join([Box::pin(async { Err("test") }) as Pin<Box<dyn Future<Output = _>>>,]).await,
Err("test")
);
assert_eq!(
try_join([
Box::pin(async {
tokio::time::sleep(Duration::from_millis(2)).await;
Ok(())
}) as Pin<Box<dyn Future<Output = _>>>,
Box::pin(async { Err("test") }),
])
.await,
Err("test")
);
assert_eq!(
try_join([
Box::pin(async {
tokio::time::sleep(Duration::from_millis(2)).await;
Ok(())
}) as Pin<Box<dyn Future<Output = _>>>,
Box::pin(async {
tokio::time::sleep(Duration::from_millis(1)).await;
Ok(())
}),
Box::pin(async { Ok(()) }),
])
.await,
Ok(())
);
assert_eq!(
try_join([
Box::pin(async {
tokio::time::sleep(Duration::from_millis(2)).await;
Ok(())
}) as Pin<Box<dyn Future<Output = _>>>,
Box::pin(async {
tokio::time::sleep(Duration::from_millis(1)).await;
Err("test")
}),
Box::pin(async { Ok(()) }),
])
.await,
Err("test")
);
/*
assert_eq!(
try_join([
Box::pin(core::future::pending()) as Pin<Box<dyn Future<Output = _>>>,
Box::pin(async { Err("test") }),
])
.await,
Err("test")
);
*/
}
#[tokio::main]
async fn main() {
run_test().await;
eprintln!("success")
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_try_join() {
run_test().await
}
}
// What follows now are my notes during my working of the problem.
// There is no expecation of reading them, but I do like to organize my thoughts by writing them into a "detective journal", if you will,
// since I find it keeps me more focused with the actual progress I'm making and less likely to backtrack.
// ----------------
//
// Thought process.
// 1. Ok, I can call `futs.into_iter` because it implements `IntoIter`. Let's do that with a `while let Some(x) = futs.into_iter().next() {...}`
// 2. The `x` implements `Pin<Box<dyn Future<Output = Result<(), &str>> + 'static, Global>>`, so I need to figure how to get it out of a `Pin`.
// ...(10 mins later on std.rs)...
// - Aha, `Pin` is more of a promise to the compiler, if I'm in safe Rust I can just worry about `x.as_mut()`.
// - I'm inside a while loop, so that's probably gonna need some subtlety.
// - Hmm. I couldn't get out of "reborrowing" something that was mut inside of a loop. My rusty senses say that's not a good path
// - to go down.
// 3. Well, Roman mentioned I should use `poll_fn`, so perhaps let's look at that?
// - I tried to do something like
// ```
/* if let Some(mut elem) = futs.into_iter().next() {
let out_of_pin = future::poll_fn(move |cx| {
if let Poll::Ready(r) = elem.as_mut().poll(cx) {
Poll::Ready(r)
} else {
Poll::Pending
}
})
.await;
match out_of_pin {
Ok(res) => return Ok(res),
Err(e) => return Err(e),
}
}
Ok(())
/ */
// But that didn't seem to be getting me further ahead.
// 4. Derp, I forget that `async` functions return Futures by default!
// ... I will now try to start with a regular for loop over the collection and then use `poll_fn`.
// -> If I have a future that is `Poll::Ready(r)`, I'll return that thing I reached into.
// 5. I'm pretty sure I have to define the iterator itself at the function start and then use a `loop {}` construct that actually polls inside of it.
// - To advance the state, because otherwise I'm just running into the old
/* ```
--> src/lib.rs:12:39
|
12 | if let Poll::Ready(r) = e.as_mut().poll(cx) {
| ^^^^^^ method cannot be called on `Pin<&dyn Future<Output = Result<(), &str>>>` due to unsatisfied trait bounds
|
= note: the following trait bounds were not satisfied:
`&dyn Future<Output = Result<(), &'a str>>: DerefMut`
*/
// chestnut again.
// Let's use the loop {} and try again.
// (After about 15mins...)
// 6. Good news and bad news! This version of the code compiles!
/*
pub async fn try_join<'a>(
futs: impl IntoIterator<Item = Pin<Box<dyn Future<Output = Result<(), &'a str>>>>>,
) -> Result<(), &'a str> {
let mut fut_iter = futs.into_iter();
loop {
let mut f = fut_iter.next();
if let Some(pinned_elem) = f.as_mut() {
let res = future::poll_fn(move |cx| {
if let Poll::Ready(r) = pinned_elem.as_mut().poll(cx) {
Poll::Ready(r)
} else {
Poll::Pending
}
});
}
}
}
*/
// ... but the tests hang forever. Well, at least I'm holding the proper tool I think.
// 7. OBS: It's very *odd* that this hangs indefinitely, since the very first test is just an async block pinning an `Ok(())`.
// -> OH!! Well, I'm never returning from the loop!, that makes sense. Derp.
// 8. So I my gut feeling is that I *want* to return the innermost `Poll::Ready(r)`, but `break Poll::Ready(r)` isn't allowed because it's inside of a `move`d closure.
// -> Maybe I can make it non-move?
// 9. Aha! This version of the code
/*
pub async fn try_join<'a>(
futs: impl IntoIterator<Item = Pin<Box<dyn Future<Output = Result<(), &'a str>>>>>,
) -> Result<(), &'a str> {
let mut fut_iter = futs.into_iter();
loop {
let mut f = fut_iter.next();
if let Some(pinned_elem) = f.as_mut() {
break future::poll_fn(|cx| {
if let Poll::Ready(r) = pinned_elem.as_mut().poll(cx) {
Poll::Ready(r)
} else {
Poll::Pending
}
})
.await;
}
}
}
*/
// Is passing the first 2 tests:
/*
assert_eq!(
try_join([Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = _>>>,]).await,
Ok(())
);
assert_eq!(
try_join([Box::pin(async { Err("test") }) as Pin<Box<dyn Future<Output = _>>>,]).await,
Err("test")
);
assert_eq!(
try_join([
Box::pin(async {
tokio::time::sleep(Duration::from_millis(2)).await;
Ok(())
}) as Pin<Box<dyn Future<Output = _>>>,
Box::pin(async { Err("test") }),
])
.await,
Err("test")
);
*/
// I think I've sniffed out what I'm doing wrong here:
// I'm kinda just starting the loop, and then checking the element in the collection and returning that (since the first 2 tests passed.)
// I'm not actually "going through the list", in a sense, so I should find a way to not short circuit the loop.
// Dang it I thought I had it when I began to write this down but now I'm doubting myself.
// 10. (2 mins later) Well, I'm unconditionally starting the loop and then `break`ing on the first element, that makes sense for it to bomb on the first collection case!
//
// -------- Intermission ------
// About 1.5 hours of real time have passed (minus a few errands) and I've at least managed to
// a) Get *some* tests passing
// b) read the source code and reviewed basic definitions.
// This is about the point where I'd feel comfortable reaching out to a more experienced colleague and have them set me straight on something silly I'm doing.
// ----- Intermission Ends ----
//
// 11. I think this is the last point where if I get frustrated more, I'd be happy to ask a colleague for help (since I haven't asked for assistance yet, other than looking at the docs.)
// -> (I should probably also look at the `poll_fn` source code, just try and get a clearer picture of what's going on.)
// -> Ok, so `poll_fn` just builds a `PollFn` that places my type T into a `Poll<T>`, and the intermediate `Pin<&mut Self>` lets me (safely) know that
// said field is not being moved. That makes sense, because it's calling `(unsafe {&mut self.get_unchecked_mut().f}))(cx)`,
// 12. Let's try and come back with that idea in mind and then go for a break, perhaps I'm doing something silly with this new pattern that I'm not used to and it will click later on.
// 13. I took a nap and had lunch, but then I came back and tried googling for the problem to find blogs or tokio documentaiton that was closer to my examples.
// -> I gave up after a few minutes, but I did find the `if let Some(...) = while fut_iter.next() {}` in the second snippet of code here: https://rust-trends.com/posts/rust-s-if-let-while-chains/
// -> I came back to rewrite the iterator in that fashion and YAY! it works!
// 14. Code that passes tests:
/*
pub async fn try_join<'a>(
futs: impl IntoIterator<Item = Pin<Box<dyn Future<Output = Result<(), &'a str>>>>>,
) -> Result<(), &'a str> {
let mut futs_iter = futs.into_iter();
while let Some(mut pinned_elem) = futs_iter.next() {
poll_fn(|cx| match pinned_elem.as_mut().poll(cx) {
Poll::Ready(r) => return Poll::Ready(r),
Poll::Pending => Poll::Pending,
})
.await?
}
Ok(())
}
*/
// 15. I ran it through clippy and changed the `while` loop to a for loop.
// 16. Time for the extra credit... Perhaps I should just add a simple counter in the `Pending` case and if it exceeds 1000 then I just should bail, no?
// -> Let's try it.
// 17. I tried it with Atomics, but it just hangs, oh well. Sleeping is the best debugger, so maybe I'll try it tomorrow.
/*
pub async fn try_join<'a>(
futs: impl IntoIterator<Item = Pin<Box<dyn Future<Output = Result<(), &'a str>>>>>,
) -> Result<(), &'a str> {
// Define the iterator outside of the loop so we can repeatedly call `futs_iter.next()` without ownership issues
let futs_iter = futs.into_iter();
// Notice the `mut pinned_elem` here, that unlocks the `.as_mut()` API later.
let limit = 100;
let mut counter = AtomicUsize::new(0);
for mut pinned_elem in futs_iter {
// Call the poll_fn method by "reaching" into the `cx`, since it's been pinned
poll_fn(|cx| match pinned_elem.as_mut().poll(cx) {
Poll::Ready(r) => Poll::Ready(r),
Poll::Pending => {
let current_progress = counter.load(std::sync::atomic::Ordering::Acquire);
if current_progress < limit {
counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Poll::Pending
} else {
Poll::Ready(Err("test"))
}
}
})
.await?
// ^ .await? it at the end, since futures are lazy and we want to "drive" their forward progress.
}
Ok(())
}
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment