Skip to content

Commit

Permalink
runtime: recover when OS fails to spawn a new thread (#4485)
Browse files Browse the repository at this point in the history
  • Loading branch information
gwik committed Feb 25, 2022
1 parent 8e0e56f commit 70c10ba
Showing 1 changed file with 30 additions and 14 deletions.
44 changes: 30 additions & 14 deletions tokio/src/runtime/blocking/pool.rs
Expand Up @@ -240,17 +240,29 @@ impl Spawner {
if shared.num_th == self.inner.thread_cap {
// At max number of threads
} else {
shared.num_th += 1;
assert!(shared.shutdown_tx.is_some());
let shutdown_tx = shared.shutdown_tx.clone();

if let Some(shutdown_tx) = shutdown_tx {
let id = shared.worker_thread_index;
shared.worker_thread_index += 1;

let handle = self.spawn_thread(shutdown_tx, rt, id);

shared.worker_threads.insert(id, handle);
match self.spawn_thread(shutdown_tx, rt, id) {
Ok(handle) => {
shared.num_th += 1;
shared.worker_thread_index += 1;
shared.worker_threads.insert(id, handle);
}
Err(ref e) if is_temporary_os_thread_error(e) && shared.num_th > 0 => {
// OS temporarily failed to spawn a new thread.
// The task will be picked up eventually by a currently
// busy thread.
}
Err(e) => {
// The OS refused to spawn the thread and there is no thread
// to pick up the task that has just been pushed to the queue.
panic!("OS can't spawn worker thread: {}", e)
}
}
}
}
} else {
Expand All @@ -272,7 +284,7 @@ impl Spawner {
shutdown_tx: shutdown::Sender,
rt: &Handle,
id: usize,
) -> thread::JoinHandle<()> {
) -> std::io::Result<thread::JoinHandle<()>> {
let mut builder = thread::Builder::new().name((self.inner.thread_name)());

if let Some(stack_size) = self.inner.stack_size {
Expand All @@ -281,17 +293,21 @@ impl Spawner {

let rt = rt.clone();

builder
.spawn(move || {
// Only the reference should be moved into the closure
let _enter = crate::runtime::context::enter(rt.clone());
rt.blocking_spawner.inner.run(id);
drop(shutdown_tx);
})
.expect("OS can't spawn a new worker thread")
builder.spawn(move || {
// Only the reference should be moved into the closure
let _enter = crate::runtime::context::enter(rt.clone());
rt.blocking_spawner.inner.run(id);
drop(shutdown_tx);
})
}
}

// Tells whether the error when spawning a thread is temporary.
#[inline]
fn is_temporary_os_thread_error(error: &std::io::Error) -> bool {
matches!(error.kind(), std::io::ErrorKind::WouldBlock)
}

impl Inner {
fn run(&self, worker_thread_id: usize) {
if let Some(f) = &self.after_start {
Expand Down

0 comments on commit 70c10ba

Please sign in to comment.