Created
May 29, 2025 00:00
-
-
Save ihciah/57edbae2eeeda9443d6542952e4b4807 to your computer and use it in GitHub Desktop.
Revisions
-
ihciah created this gist
May 29, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,115 @@ use std::cell::RefCell; use std::pin::Pin; use std::task::Context; use std::task::Poll; use std::task::ready; use futures::Stream; use futures::TryFuture; use futures::stream::FuturesUnordered; use futures::stream::StreamExt; #[pin_project::pin_project] pub struct FutureRetry<F, Fut> where F: Fn() -> Fut, { #[pin] fut: Fut, f: F, max_retries: usize, } impl<F: Fn() -> Fut, Fut> FutureRetry<F, Fut> { pub fn new(f: F, max_retries: usize) -> Self { Self { fut: f(), f, max_retries, } } } impl<F, Fut> Future for FutureRetry<F, Fut> where F: Fn() -> Fut, Fut: TryFuture, { type Output = Result<Fut::Ok, Fut::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let mut this = self.project(); loop { match ready!(this.fut.as_mut().try_poll(cx)) { Ok(item) => return Poll::Ready(Ok(item)), Err(e) => { if *this.max_retries > 0 { *this.max_retries -= 1; this.fut.set((this.f)()); continue; } else { return Poll::Ready(Err(e)); } } } } } } #[pin_project::pin_project] pub struct FuturesUnorderedRetry<const RETRY: usize, Fut> { #[pin] inner: FuturesUnordered<Fut>, } impl<const RETRY: usize, F: Fn() -> Fut, Fut> FromIterator<F> for FuturesUnorderedRetry<RETRY, FutureRetry<F, Fut>> { fn from_iter<I>(iter: I) -> Self where I: IntoIterator<Item = F>, { let inner = FuturesUnordered::new(); Self { inner: iter.into_iter().fold(inner, |inner, f| { inner.push(FutureRetry::new(f, RETRY)); inner }), } } } impl<const RETRY: usize, Fut: Future> Stream for FuturesUnorderedRetry<RETRY, Fut> { type Item = Fut::Output; #[inline] fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let this = self.project(); this.inner.poll_next(cx) } } // === example === async fn try_add_1(input: &RefCell<usize>) -> Result<String, ()> { let mut input = input.borrow_mut(); *input += 1; if *input >= 10 { Ok("success".to_string()) } else { Err(()) } } #[tokio::test] async fn test() { let first = RefCell::new(5); let second = RefCell::new(6); let mut results = [&first, &second] .into_iter() .map(|input| move || try_add_1(input)) .collect::<FuturesUnorderedRetry<3_usize, _>>(); while let Some(result) = results.next().await { println!("result: {:?}", result); } }