use async_std::sync::{channel, Receiver}; use futures::future::FutureExt; use rand::rngs::SmallRng; use rand::FromEntropy; use rand::Rng; use std::thread; use std::time; use tokio::time::delay_for; #[derive(PartialEq, Clone, Debug)] enum HaveToWait { Complete(String), NewRequest(usize), } async fn do_something2(x: usize) -> String { let mut rng = SmallRng::from_entropy(); delay_for(time::Duration::from_millis(rng.gen_range(10, 50))).await; if x == 999 { "".to_owned() } else { format!("Wow, I consumed {}", x) } } fn smart_consume(recv: Receiver) { let mut pool = Vec::new(); let mut rt = tokio::runtime::Runtime::new().unwrap(); let mut popped = true; loop { if popped || pool.is_empty() { pool.push( recv.recv() .map(|x| HaveToWait::NewRequest(x.unwrap())) .boxed(), ); popped = false; } let (r, _, mut new_pool) = rt.block_on(futures::future::select_all(pool)); match r { HaveToWait::Complete(msg) => { println!("{:?}", msg); if msg.is_empty() { return; } } HaveToWait::NewRequest(arg) => { // new request new_pool.push(do_something2(arg).map(|x| HaveToWait::Complete(x)).boxed()); popped = true; } } pool = new_pool; } } pub fn complex() { let mut rt = tokio::runtime::Runtime::new().unwrap(); let (send, recv) = channel(1000); let t = thread::spawn(move || smart_consume(recv)); let mut rng = SmallRng::from_entropy(); for i in 0..1000 { rt.block_on(send.send(i)); thread::sleep(time::Duration::from_millis(rng.gen_range(1, 10))); } t.join().unwrap(); }