Last active
May 23, 2025 02:48
-
-
Save miguelraz/645db0fc17ecc4abb0c7e89f7b4f4d11 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use core::future::Future; | |
use core::pin::Pin; | |
use core::time::Duration; | |
use std::future::poll_fn; | |
use std::task::Poll; | |
pub async fn try_join<'a>( | |
futs: impl IntoIterator<Item = Pin<Box<dyn Future<Output = Result<(), &'a str>>>>>, | |
) -> Result<(), &'a str> { | |
// Define the iterator outside of the loop so we can repeatedly call `futs_iter.next()` without ownership issues | |
let futs_iter = futs.into_iter(); | |
// Notice the `mut pinned_elem` here, that unlocks the `.as_mut()` API later. | |
for mut pinned_elem in futs_iter { | |
// Call the poll_fn method by "reaching" into the `cx`, since it's been pinned | |
poll_fn(|cx| match pinned_elem.as_mut().poll(cx) { | |
Poll::Ready(r) => Poll::Ready(r), | |
Poll::Pending => Poll::Pending, | |
}) | |
.await? | |
// ^ .await? it at the end, since futures are lazy and we want to "drive" their forward progress. | |
} | |
Ok(()) | |
} | |
pub async fn run_test() { | |
assert_eq!( | |
try_join([Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = _>>>,]).await, | |
Ok(()) | |
); | |
assert_eq!( | |
try_join([Box::pin(async { Err("test") }) as Pin<Box<dyn Future<Output = _>>>,]).await, | |
Err("test") | |
); | |
assert_eq!( | |
try_join([ | |
Box::pin(async { | |
tokio::time::sleep(Duration::from_millis(2)).await; | |
Ok(()) | |
}) as Pin<Box<dyn Future<Output = _>>>, | |
Box::pin(async { Err("test") }), | |
]) | |
.await, | |
Err("test") | |
); | |
assert_eq!( | |
try_join([ | |
Box::pin(async { | |
tokio::time::sleep(Duration::from_millis(2)).await; | |
Ok(()) | |
}) as Pin<Box<dyn Future<Output = _>>>, | |
Box::pin(async { | |
tokio::time::sleep(Duration::from_millis(1)).await; | |
Ok(()) | |
}), | |
Box::pin(async { Ok(()) }), | |
]) | |
.await, | |
Ok(()) | |
); | |
assert_eq!( | |
try_join([ | |
Box::pin(async { | |
tokio::time::sleep(Duration::from_millis(2)).await; | |
Ok(()) | |
}) as Pin<Box<dyn Future<Output = _>>>, | |
Box::pin(async { | |
tokio::time::sleep(Duration::from_millis(1)).await; | |
Err("test") | |
}), | |
Box::pin(async { Ok(()) }), | |
]) | |
.await, | |
Err("test") | |
); | |
/* | |
assert_eq!( | |
try_join([ | |
Box::pin(core::future::pending()) as Pin<Box<dyn Future<Output = _>>>, | |
Box::pin(async { Err("test") }), | |
]) | |
.await, | |
Err("test") | |
); | |
*/ | |
} | |
#[tokio::main] | |
async fn main() { | |
run_test().await; | |
eprintln!("success") | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
#[tokio::test] | |
async fn test_try_join() { | |
run_test().await | |
} | |
} | |
// What follows now are my notes during my working of the problem. | |
// There is no expecation of reading them, but I do like to organize my thoughts by writing them into a "detective journal", if you will, | |
// since I find it keeps me more focused with the actual progress I'm making and less likely to backtrack. | |
// ---------------- | |
// | |
// Thought process. | |
// 1. Ok, I can call `futs.into_iter` because it implements `IntoIter`. Let's do that with a `while let Some(x) = futs.into_iter().next() {...}` | |
// 2. The `x` implements `Pin<Box<dyn Future<Output = Result<(), &str>> + 'static, Global>>`, so I need to figure how to get it out of a `Pin`. | |
// ...(10 mins later on std.rs)... | |
// - Aha, `Pin` is more of a promise to the compiler, if I'm in safe Rust I can just worry about `x.as_mut()`. | |
// - I'm inside a while loop, so that's probably gonna need some subtlety. | |
// - Hmm. I couldn't get out of "reborrowing" something that was mut inside of a loop. My rusty senses say that's not a good path | |
// - to go down. | |
// 3. Well, Roman mentioned I should use `poll_fn`, so perhaps let's look at that? | |
// - I tried to do something like | |
// ``` | |
/* if let Some(mut elem) = futs.into_iter().next() { | |
let out_of_pin = future::poll_fn(move |cx| { | |
if let Poll::Ready(r) = elem.as_mut().poll(cx) { | |
Poll::Ready(r) | |
} else { | |
Poll::Pending | |
} | |
}) | |
.await; | |
match out_of_pin { | |
Ok(res) => return Ok(res), | |
Err(e) => return Err(e), | |
} | |
} | |
Ok(()) | |
/ */ | |
// But that didn't seem to be getting me further ahead. | |
// 4. Derp, I forget that `async` functions return Futures by default! | |
// ... I will now try to start with a regular for loop over the collection and then use `poll_fn`. | |
// -> If I have a future that is `Poll::Ready(r)`, I'll return that thing I reached into. | |
// 5. I'm pretty sure I have to define the iterator itself at the function start and then use a `loop {}` construct that actually polls inside of it. | |
// - To advance the state, because otherwise I'm just running into the old | |
/* ``` | |
--> src/lib.rs:12:39 | |
| | |
12 | if let Poll::Ready(r) = e.as_mut().poll(cx) { | |
| ^^^^^^ method cannot be called on `Pin<&dyn Future<Output = Result<(), &str>>>` due to unsatisfied trait bounds | |
| | |
= note: the following trait bounds were not satisfied: | |
`&dyn Future<Output = Result<(), &'a str>>: DerefMut` | |
*/ | |
// chestnut again. | |
// Let's use the loop {} and try again. | |
// (After about 15mins...) | |
// 6. Good news and bad news! This version of the code compiles! | |
/* | |
pub async fn try_join<'a>( | |
futs: impl IntoIterator<Item = Pin<Box<dyn Future<Output = Result<(), &'a str>>>>>, | |
) -> Result<(), &'a str> { | |
let mut fut_iter = futs.into_iter(); | |
loop { | |
let mut f = fut_iter.next(); | |
if let Some(pinned_elem) = f.as_mut() { | |
let res = future::poll_fn(move |cx| { | |
if let Poll::Ready(r) = pinned_elem.as_mut().poll(cx) { | |
Poll::Ready(r) | |
} else { | |
Poll::Pending | |
} | |
}); | |
} | |
} | |
} | |
*/ | |
// ... but the tests hang forever. Well, at least I'm holding the proper tool I think. | |
// 7. OBS: It's very *odd* that this hangs indefinitely, since the very first test is just an async block pinning an `Ok(())`. | |
// -> OH!! Well, I'm never returning from the loop!, that makes sense. Derp. | |
// 8. So I my gut feeling is that I *want* to return the innermost `Poll::Ready(r)`, but `break Poll::Ready(r)` isn't allowed because it's inside of a `move`d closure. | |
// -> Maybe I can make it non-move? | |
// 9. Aha! This version of the code | |
/* | |
pub async fn try_join<'a>( | |
futs: impl IntoIterator<Item = Pin<Box<dyn Future<Output = Result<(), &'a str>>>>>, | |
) -> Result<(), &'a str> { | |
let mut fut_iter = futs.into_iter(); | |
loop { | |
let mut f = fut_iter.next(); | |
if let Some(pinned_elem) = f.as_mut() { | |
break future::poll_fn(|cx| { | |
if let Poll::Ready(r) = pinned_elem.as_mut().poll(cx) { | |
Poll::Ready(r) | |
} else { | |
Poll::Pending | |
} | |
}) | |
.await; | |
} | |
} | |
} | |
*/ | |
// Is passing the first 2 tests: | |
/* | |
assert_eq!( | |
try_join([Box::pin(async { Ok(()) }) as Pin<Box<dyn Future<Output = _>>>,]).await, | |
Ok(()) | |
); | |
assert_eq!( | |
try_join([Box::pin(async { Err("test") }) as Pin<Box<dyn Future<Output = _>>>,]).await, | |
Err("test") | |
); | |
assert_eq!( | |
try_join([ | |
Box::pin(async { | |
tokio::time::sleep(Duration::from_millis(2)).await; | |
Ok(()) | |
}) as Pin<Box<dyn Future<Output = _>>>, | |
Box::pin(async { Err("test") }), | |
]) | |
.await, | |
Err("test") | |
); | |
*/ | |
// I think I've sniffed out what I'm doing wrong here: | |
// I'm kinda just starting the loop, and then checking the element in the collection and returning that (since the first 2 tests passed.) | |
// I'm not actually "going through the list", in a sense, so I should find a way to not short circuit the loop. | |
// Dang it I thought I had it when I began to write this down but now I'm doubting myself. | |
// 10. (2 mins later) Well, I'm unconditionally starting the loop and then `break`ing on the first element, that makes sense for it to bomb on the first collection case! | |
// | |
// -------- Intermission ------ | |
// About 1.5 hours of real time have passed (minus a few errands) and I've at least managed to | |
// a) Get *some* tests passing | |
// b) read the source code and reviewed basic definitions. | |
// This is about the point where I'd feel comfortable reaching out to a more experienced colleague and have them set me straight on something silly I'm doing. | |
// ----- Intermission Ends ---- | |
// | |
// 11. I think this is the last point where if I get frustrated more, I'd be happy to ask a colleague for help (since I haven't asked for assistance yet, other than looking at the docs.) | |
// -> (I should probably also look at the `poll_fn` source code, just try and get a clearer picture of what's going on.) | |
// -> Ok, so `poll_fn` just builds a `PollFn` that places my type T into a `Poll<T>`, and the intermediate `Pin<&mut Self>` lets me (safely) know that | |
// said field is not being moved. That makes sense, because it's calling `(unsafe {&mut self.get_unchecked_mut().f}))(cx)`, | |
// 12. Let's try and come back with that idea in mind and then go for a break, perhaps I'm doing something silly with this new pattern that I'm not used to and it will click later on. | |
// 13. I took a nap and had lunch, but then I came back and tried googling for the problem to find blogs or tokio documentaiton that was closer to my examples. | |
// -> I gave up after a few minutes, but I did find the `if let Some(...) = while fut_iter.next() {}` in the second snippet of code here: https://rust-trends.com/posts/rust-s-if-let-while-chains/ | |
// -> I came back to rewrite the iterator in that fashion and YAY! it works! | |
// 14. Code that passes tests: | |
/* | |
pub async fn try_join<'a>( | |
futs: impl IntoIterator<Item = Pin<Box<dyn Future<Output = Result<(), &'a str>>>>>, | |
) -> Result<(), &'a str> { | |
let mut futs_iter = futs.into_iter(); | |
while let Some(mut pinned_elem) = futs_iter.next() { | |
poll_fn(|cx| match pinned_elem.as_mut().poll(cx) { | |
Poll::Ready(r) => return Poll::Ready(r), | |
Poll::Pending => Poll::Pending, | |
}) | |
.await? | |
} | |
Ok(()) | |
} | |
*/ | |
// 15. I ran it through clippy and changed the `while` loop to a for loop. | |
// 16. Time for the extra credit... Perhaps I should just add a simple counter in the `Pending` case and if it exceeds 1000 then I just should bail, no? | |
// -> Let's try it. | |
// 17. I tried it with Atomics, but it just hangs, oh well. Sleeping is the best debugger, so maybe I'll try it tomorrow. | |
/* | |
pub async fn try_join<'a>( | |
futs: impl IntoIterator<Item = Pin<Box<dyn Future<Output = Result<(), &'a str>>>>>, | |
) -> Result<(), &'a str> { | |
// Define the iterator outside of the loop so we can repeatedly call `futs_iter.next()` without ownership issues | |
let futs_iter = futs.into_iter(); | |
// Notice the `mut pinned_elem` here, that unlocks the `.as_mut()` API later. | |
let limit = 100; | |
let mut counter = AtomicUsize::new(0); | |
for mut pinned_elem in futs_iter { | |
// Call the poll_fn method by "reaching" into the `cx`, since it's been pinned | |
poll_fn(|cx| match pinned_elem.as_mut().poll(cx) { | |
Poll::Ready(r) => Poll::Ready(r), | |
Poll::Pending => { | |
let current_progress = counter.load(std::sync::atomic::Ordering::Acquire); | |
if current_progress < limit { | |
counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst); | |
Poll::Pending | |
} else { | |
Poll::Ready(Err("test")) | |
} | |
} | |
}) | |
.await? | |
// ^ .await? it at the end, since futures are lazy and we want to "drive" their forward progress. | |
} | |
Ok(()) | |
} | |
*/ | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment