From 817f95a3c0a8dfa83ac304d2d58611f6ba4afed5 Mon Sep 17 00:00:00 2001 From: Antonin Amand Date: Wed, 9 Feb 2022 21:28:41 +0100 Subject: [PATCH 1/5] recover when OS fails to spawn a new thread Avoid panicking when the OS reaches the limit of the number of threads / processes and the error is temporary. Spawning a new thread is not mandatory to make progress as long as there is a least one thread in the pool already processing the task queue. Fixes: #2309 --- tokio/src/runtime/blocking/pool.rs | 38 +++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index daf1f63fac3..4b042b7a18e 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -240,17 +240,25 @@ impl Spawner { if shared.num_th == self.inner.thread_cap { // At max number of threads } else { - shared.num_th += 1; assert!(shared.shutdown_tx.is_some()); let shutdown_tx = shared.shutdown_tx.clone(); if let Some(shutdown_tx) = shutdown_tx { let id = shared.worker_thread_index; - shared.worker_thread_index += 1; - let handle = self.spawn_thread(shutdown_tx, rt, id); - - shared.worker_threads.insert(id, handle); + if let Some(handle) = self.spawn_thread(shutdown_tx, rt, id) { + shared.num_th += 1; + shared.worker_thread_index += 1; + shared.worker_threads.insert(id, handle); + } else if shared.num_th == 0 { + // The OS refused to spawn the thread and there is no thread + // to pick up the task that has just been pushed to the queue. + panic!("Could not spawn any thread"); + } else { + // OS temporarily failed to spawn a new thread. + // The task will be picked up eventually by a currently + // busy thread. + } } } } else { @@ -272,7 +280,7 @@ impl Spawner { shutdown_tx: shutdown::Sender, rt: &Handle, id: usize, - ) -> thread::JoinHandle<()> { + ) -> Option> { let mut builder = thread::Builder::new().name((self.inner.thread_name)()); if let Some(stack_size) = self.inner.stack_size { @@ -288,10 +296,28 @@ impl Spawner { rt.blocking_spawner.inner.run(id); drop(shutdown_tx); }) + .map(Some) + .or_else(|error| { + if is_temporary_os_thread_error(&error) { + Ok(None) + } else { + Err(error) + } + }) .expect("OS can't spawn a new worker thread") } } +// Tells whether the error when spawning a thread is temporary. +#[inline] +fn is_temporary_os_thread_error(error: &std::io::Error) -> bool { + // Most probably OS specific, only tested on linux. + matches!( + error.kind(), + std::io::ErrorKind::WouldBlock | std::io::ErrorKind::OutOfMemory + ) +} + impl Inner { fn run(&self, worker_thread_id: usize) { if let Some(f) = &self.after_start { From 004c5fd47b313db36be79ee6702729674bfa9638 Mon Sep 17 00:00:00 2001 From: Antonin Amand Date: Wed, 9 Feb 2022 23:26:22 +0100 Subject: [PATCH 2/5] contain the panicking logic to one place Co-authored-by: Eliza Weisman --- tokio/src/runtime/blocking/pool.rs | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 4b042b7a18e..69ee3f718c0 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -246,18 +246,22 @@ impl Spawner { if let Some(shutdown_tx) = shutdown_tx { let id = shared.worker_thread_index; - if let Some(handle) = self.spawn_thread(shutdown_tx, rt, id) { - shared.num_th += 1; - shared.worker_thread_index += 1; - shared.worker_threads.insert(id, handle); - } else if shared.num_th == 0 { - // The OS refused to spawn the thread and there is no thread - // to pick up the task that has just been pushed to the queue. - panic!("Could not spawn any thread"); - } else { - // OS temporarily failed to spawn a new thread. - // The task will be picked up eventually by a currently - // busy thread. + match self.spawn_thread(shutdown_tx, rt, id) { + Ok(handle) => { + shared.num_th += 1; + shared.worker_thread_index += 1; + shared.worker_threads.insert(id, handle); + }, + Err(err) if is_temporary_os_thread_error(error) && shared.num_th > 0 => { + // OS temporarily failed to spawn a new thread. + // The task will be picked up eventually by a currently + // busy thread. + }, + Err(e) => { + // The OS refused to spawn the thread and there is no thread + // to pick up the task that has just been pushed to the queue. + panic!("OS can't spawn worker thread: {}", e) + }, } } } From 11bbb65ab1d3a400a82ba2992842500f615a6dde Mon Sep 17 00:00:00 2001 From: Antonin Amand Date: Wed, 9 Feb 2022 23:59:48 +0100 Subject: [PATCH 3/5] fix after merging suggestion --- tokio/src/runtime/blocking/pool.rs | 32 ++++++++++-------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 69ee3f718c0..b0b96e641f0 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -251,17 +251,17 @@ impl Spawner { shared.num_th += 1; shared.worker_thread_index += 1; shared.worker_threads.insert(id, handle); - }, - Err(err) if is_temporary_os_thread_error(error) && shared.num_th > 0 => { + } + Err(ref e) if is_temporary_os_thread_error(e) && shared.num_th > 0 => { // OS temporarily failed to spawn a new thread. // The task will be picked up eventually by a currently // busy thread. - }, + } Err(e) => { // The OS refused to spawn the thread and there is no thread // to pick up the task that has just been pushed to the queue. panic!("OS can't spawn worker thread: {}", e) - }, + } } } } @@ -284,7 +284,7 @@ impl Spawner { shutdown_tx: shutdown::Sender, rt: &Handle, id: usize, - ) -> Option> { + ) -> Result, std::io::Error> { let mut builder = thread::Builder::new().name((self.inner.thread_name)()); if let Some(stack_size) = self.inner.stack_size { @@ -293,22 +293,12 @@ impl Spawner { let rt = rt.clone(); - builder - .spawn(move || { - // Only the reference should be moved into the closure - let _enter = crate::runtime::context::enter(rt.clone()); - rt.blocking_spawner.inner.run(id); - drop(shutdown_tx); - }) - .map(Some) - .or_else(|error| { - if is_temporary_os_thread_error(&error) { - Ok(None) - } else { - Err(error) - } - }) - .expect("OS can't spawn a new worker thread") + builder.spawn(move || { + // Only the reference should be moved into the closure + let _enter = crate::runtime::context::enter(rt.clone()); + rt.blocking_spawner.inner.run(id); + drop(shutdown_tx); + }) } } From 27d804aa75d208be167485758892e9e936e3f647 Mon Sep 17 00:00:00 2001 From: Antonin Amand Date: Thu, 10 Feb 2022 00:27:44 +0100 Subject: [PATCH 4/5] sugg std::io::Result Co-authored-by: Eliza Weisman --- tokio/src/runtime/blocking/pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index b0b96e641f0..690ee3fec72 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -284,7 +284,7 @@ impl Spawner { shutdown_tx: shutdown::Sender, rt: &Handle, id: usize, - ) -> Result, std::io::Error> { + ) -> std::io::Result> { let mut builder = thread::Builder::new().name((self.inner.thread_name)()); if let Some(stack_size) = self.inner.stack_size { From e9f284e7cd832fdd04e86cea21c34d477784a9aa Mon Sep 17 00:00:00 2001 From: Antonin Amand Date: Thu, 10 Feb 2022 16:51:38 +0100 Subject: [PATCH 5/5] remove OutOfMemory and comment OutOfMemory is not available in 1.49 and I can't find a reproducible scenario anyway. The fix works on Windows and mac OS so removing the comment. --- tokio/src/runtime/blocking/pool.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 690ee3fec72..ef366239072 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -305,11 +305,7 @@ impl Spawner { // Tells whether the error when spawning a thread is temporary. #[inline] fn is_temporary_os_thread_error(error: &std::io::Error) -> bool { - // Most probably OS specific, only tested on linux. - matches!( - error.kind(), - std::io::ErrorKind::WouldBlock | std::io::ErrorKind::OutOfMemory - ) + matches!(error.kind(), std::io::ErrorKind::WouldBlock) } impl Inner {