use futures::StreamExt; use std::error::Error; use tokio; use tokio::macros::support::Pin; use tokio::prelude::*; use tokio::time::{Duration, Instant}; pub fn main() -> Result<(), Box> { let mut multi_threaded_runtime = tokio::runtime::Builder::new() .threaded_scheduler() .enable_all() .core_threads(10) .max_threads(10) .thread_name("multi-threaded") .build()?; // multi_threaded_runtime.block_on(concurrent()); multi_threaded_runtime.block_on(concurrentAndParallel()); Ok(()) } // As can be seen by the output below, despite specifying parallelism of 3, we are still bound by // the fact that all of the future generated by this function execute within a single task. async fn concurrent() { let before = Instant::now(); let paths = (0..6).rev(); let fetches = futures::stream::iter(paths.into_iter().map(|path| make_request(path))) .buffer_unordered(3) .map(|r| println!("finished request: {}", r)) .collect::>(); fetches.await; println!("elapsed time: {:.2?}", before.elapsed()); } // started request // finished request: current thread ThreadId(1) | thread name main | request_duration 5 // started request // finished request: current thread ThreadId(1) | thread name main | request_duration 4 // started request // finished request: current thread ThreadId(1) | thread name main | request_duration 3 // started request // finished request: current thread ThreadId(1) | thread name main | request_duration 2 // started request // finished request: current thread ThreadId(1) | thread name main | request_duration 1 // started request // finished request: current thread ThreadId(1) | thread name main | request_duration 0 // elapsed time: 15.01s // Here, we wrap every future within its own task using tokio::spawn. This allows the "requests" to // execute in parallel (depending on how many threads the runtime is configured with; 10 in this // case) using the multiplexing that Tokio does between different tasks and threads. You can see // from the output how 3 threads with ids, 9, 10, and 11, are consistently used to execute all of // the 6 "requests". async fn concurrentAndParallel() { let before = Instant::now(); let paths = (0..6).rev(); let fetches = futures::stream::iter( paths .into_iter() .map(|path| tokio::spawn(make_request(path))), ) .buffer_unordered(3) .map(|r| { println!( "finished request: {}", match r { Ok(rr) => rr, Err(_) => String::from("Bad"), } ); }) .collect::>(); fetches.await; println!("elapsed time: {:.2?}", before.elapsed()); } // started request // started request // started request // finished request: current thread ThreadId(9) | thread name multi-threaded | request_duration 3 // started request // finished request: current thread ThreadId(10) | thread name multi-threaded | request_duration 4 // started request // finished request: current thread ThreadId(11) | thread name multi-threaded | request_duration 5 // started request // finished request: current thread ThreadId(11) | thread name multi-threaded | request_duration 0 // finished request: current thread ThreadId(10) | thread name multi-threaded | request_duration 1 // finished request: current thread ThreadId(9) | thread name multi-threaded | request_duration 2 // elapsed time: 5.01s async fn make_request(sleep: u64) -> String { println!("started request"); std::thread::sleep(Duration::from_secs(sleep)); format!( "current thread {:?} | thread name {} | request_duration {:?}", std::thread::current().id(), std::thread::current() .name() .get_or_insert("default_thread_name"), sleep ) }