Skip to content

Instantly share code, notes, and snippets.

@ihciah
Created May 29, 2025 00:00
Show Gist options
  • Select an option

  • Save ihciah/57edbae2eeeda9443d6542952e4b4807 to your computer and use it in GitHub Desktop.

Select an option

Save ihciah/57edbae2eeeda9443d6542952e4b4807 to your computer and use it in GitHub Desktop.

Revisions

  1. ihciah created this gist May 29, 2025.
    115 changes: 115 additions & 0 deletions lib.rs
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,115 @@
    use std::cell::RefCell;
    use std::pin::Pin;
    use std::task::Context;
    use std::task::Poll;
    use std::task::ready;

    use futures::Stream;
    use futures::TryFuture;
    use futures::stream::FuturesUnordered;
    use futures::stream::StreamExt;

    #[pin_project::pin_project]
    pub struct FutureRetry<F, Fut>
    where
    F: Fn() -> Fut,
    {
    #[pin]
    fut: Fut,
    f: F,
    max_retries: usize,
    }

    impl<F: Fn() -> Fut, Fut> FutureRetry<F, Fut> {
    pub fn new(f: F, max_retries: usize) -> Self {
    Self {
    fut: f(),
    f,
    max_retries,
    }
    }
    }

    impl<F, Fut> Future for FutureRetry<F, Fut>
    where
    F: Fn() -> Fut,
    Fut: TryFuture,
    {
    type Output = Result<Fut::Ok, Fut::Error>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let mut this = self.project();
    loop {
    match ready!(this.fut.as_mut().try_poll(cx)) {
    Ok(item) => return Poll::Ready(Ok(item)),
    Err(e) => {
    if *this.max_retries > 0 {
    *this.max_retries -= 1;
    this.fut.set((this.f)());
    continue;
    } else {
    return Poll::Ready(Err(e));
    }
    }
    }
    }
    }
    }

    #[pin_project::pin_project]
    pub struct FuturesUnorderedRetry<const RETRY: usize, Fut> {
    #[pin]
    inner: FuturesUnordered<Fut>,
    }

    impl<const RETRY: usize, F: Fn() -> Fut, Fut> FromIterator<F>
    for FuturesUnorderedRetry<RETRY, FutureRetry<F, Fut>>
    {
    fn from_iter<I>(iter: I) -> Self
    where
    I: IntoIterator<Item = F>,
    {
    let inner = FuturesUnordered::new();
    Self {
    inner: iter.into_iter().fold(inner, |inner, f| {
    inner.push(FutureRetry::new(f, RETRY));
    inner
    }),
    }
    }
    }

    impl<const RETRY: usize, Fut: Future> Stream for FuturesUnorderedRetry<RETRY, Fut> {
    type Item = Fut::Output;

    #[inline]
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
    let this = self.project();
    this.inner.poll_next(cx)
    }
    }

    // === example ===

    async fn try_add_1(input: &RefCell<usize>) -> Result<String, ()> {
    let mut input = input.borrow_mut();
    *input += 1;
    if *input >= 10 {
    Ok("success".to_string())
    } else {
    Err(())
    }
    }

    #[tokio::test]
    async fn test() {
    let first = RefCell::new(5);
    let second = RefCell::new(6);
    let mut results = [&first, &second]
    .into_iter()
    .map(|input| move || try_add_1(input))
    .collect::<FuturesUnorderedRetry<3_usize, _>>();
    while let Some(result) = results.next().await {
    println!("result: {:?}", result);
    }
    }