Skip to content

Instantly share code, notes, and snippets.

@gtrak
Created July 29, 2025 18:50
Show Gist options
  • Save gtrak/15c573a39a926cb2fb397813ef171032 to your computer and use it in GitHub Desktop.
Save gtrak/15c573a39a926cb2fb397813ef171032 to your computer and use it in GitHub Desktop.
manually-driven-async.rs
use std::collections::{HashMap, VecDeque};
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use futures::channel::oneshot::{channel, Receiver, Sender};
// === Simulated HTTP Future Token ===
#[derive(Debug)]
pub struct HttpRequestToken {
url: String,
response_tx: Option<Sender<String>>,
}
impl HttpRequestToken {
pub fn new(url: &str, tx: Sender<String>) -> Self {
HttpRequestToken {
url: url.to_string(),
response_tx: Some(tx),
// response_rx: Some(rx),
}
}
pub fn respond(&mut self, body: String) {
println!("Responding {}", body);
self.response_tx
.take()
.expect("Missing sender")
.send(body)
.expect("Unable to send");
}
pub fn url(&self) -> &str {
&self.url
}
}
// === Driver that polls the async fn manually ===
struct HttpDriver<T> {
fut: Pin<Box<dyn Future<Output = T>>>,
pending_tokens: Arc<Mutex<VecDeque<HttpRequestToken>>>,
}
impl<T> HttpDriver<T> {
fn new(mut f: impl FnMut(HttpRequestFn) -> Box<dyn Future<Output = T>>) -> Self {
let pending: Arc<Mutex<VecDeque<HttpRequestToken>>> = Arc::new(VecDeque::new().into());
let fut = f(make_request_closure(pending.clone()));
HttpDriver {
fut: Pin::from(fut),
pending_tokens: pending,
}
}
fn poll(&mut self) -> Poll<T> {
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
self.fut.as_mut().poll(&mut cx)
}
fn take_next_request(&mut self) -> Option<HttpRequestToken> {
self.pending_tokens
.clone()
.lock()
.expect("could not lock mutex")
.pop_front()
}
}
// === Helper closure used in async fn ===
type HttpRequestFn = Box<dyn FnMut(&str) -> Receiver<String>>;
fn make_request_closure(
queue: Arc<Mutex<VecDeque<HttpRequestToken>>>,
) -> Box<dyn FnMut(&str) -> Receiver<String>> {
Box::new(move |url: &str| {
let (tx, rx) = channel();
let token = HttpRequestToken::new(url, tx);
(queue.lock().expect("could not lock mutex")).push_back(token);
rx
})
}
// === Async function to be manually driven ===
/* TODO: does this have to be owned? */
async fn fetch_sequence(mut make_request: impl FnMut(&str) -> Receiver<String>) -> String {
let a = make_request("https://a.com").await.expect("Error");
let b = make_request("https://b.com").await.expect("Error");
let c = make_request("https://c.com").await.expect("Error");
format!("Results: A='{}', B='{}', C='{}'", a, b, c)
}
// === Main Simulation ===
pub fn main() {
let responses = HashMap::from([
("https://a.com", "Response A"),
("https://b.com", "Response B"),
("https://c.com", "Response C"),
]);
let mut driver = HttpDriver::new(|http: Box<dyn FnMut(&str) -> Receiver<String>>| {
Box::new(fetch_sequence(http))
});
loop {
// Step 1: Check if future is ready
match driver.poll() {
Poll::Ready(result) => {
println!("Finished: {}", result);
break;
}
Poll::Pending => {
// Step 2: See if there is a request waiting
if let Some(mut req) = driver.take_next_request() {
let url = req.url().to_string();
let response = responses
.get(req.url())
.unwrap_or(&"Unknown URL")
.to_string();
println!("Handling request to {}, {}", url, response);
req.respond(response);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment