diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index daf1f63fac3..ef366239072 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -240,17 +240,29 @@ impl Spawner { if shared.num_th == self.inner.thread_cap { // At max number of threads } else { - shared.num_th += 1; assert!(shared.shutdown_tx.is_some()); let shutdown_tx = shared.shutdown_tx.clone(); if let Some(shutdown_tx) = shutdown_tx { let id = shared.worker_thread_index; - shared.worker_thread_index += 1; - let handle = self.spawn_thread(shutdown_tx, rt, id); - - shared.worker_threads.insert(id, handle); + match self.spawn_thread(shutdown_tx, rt, id) { + Ok(handle) => { + shared.num_th += 1; + shared.worker_thread_index += 1; + shared.worker_threads.insert(id, handle); + } + Err(ref e) if is_temporary_os_thread_error(e) && shared.num_th > 0 => { + // OS temporarily failed to spawn a new thread. + // The task will be picked up eventually by a currently + // busy thread. + } + Err(e) => { + // The OS refused to spawn the thread and there is no thread + // to pick up the task that has just been pushed to the queue. + panic!("OS can't spawn worker thread: {}", e) + } + } } } } else { @@ -272,7 +284,7 @@ impl Spawner { shutdown_tx: shutdown::Sender, rt: &Handle, id: usize, - ) -> thread::JoinHandle<()> { + ) -> std::io::Result> { let mut builder = thread::Builder::new().name((self.inner.thread_name)()); if let Some(stack_size) = self.inner.stack_size { @@ -281,17 +293,21 @@ impl Spawner { let rt = rt.clone(); - builder - .spawn(move || { - // Only the reference should be moved into the closure - let _enter = crate::runtime::context::enter(rt.clone()); - rt.blocking_spawner.inner.run(id); - drop(shutdown_tx); - }) - .expect("OS can't spawn a new worker thread") + builder.spawn(move || { + // Only the reference should be moved into the closure + let _enter = crate::runtime::context::enter(rt.clone()); + rt.blocking_spawner.inner.run(id); + drop(shutdown_tx); + }) } } +// Tells whether the error when spawning a thread is temporary. +#[inline] +fn is_temporary_os_thread_error(error: &std::io::Error) -> bool { + matches!(error.kind(), std::io::ErrorKind::WouldBlock) +} + impl Inner { fn run(&self, worker_thread_id: usize) { if let Some(f) = &self.after_start {