Skip to content

Commit

Permalink
rt: remove last slab dependency (#2917)
Browse files Browse the repository at this point in the history
This removes the last slab dependency by replacing the current slab-based
JoinHandle tracking with one based on HashMap instead.

Co-authored-by: Bryan Donlan <bdonlan@amazon.com>
  • Loading branch information
bdonlan and Bryan Donlan committed Nov 5, 2020
1 parent 0b3918b commit d7e3fcb
Showing 1 changed file with 45 additions and 14 deletions.
59 changes: 45 additions & 14 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};

use slab::Slab;

use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::time::Duration;

Expand Down Expand Up @@ -59,7 +57,18 @@ struct Shared {
num_notify: u32,
shutdown: bool,
shutdown_tx: Option<shutdown::Sender>,
worker_threads: Slab<thread::JoinHandle<()>>,
/// Prior to shutdown, we clean up JoinHandles by having each timed-out
/// thread join on the previous timed-out thread. This is not strictly
/// necessary but helps avoid Valgrind false positives, see
/// https://github.com/tokio-rs/tokio/commit/646fbae76535e397ef79dbcaacb945d4c829f666
/// for more information.
last_exiting_thread: Option<thread::JoinHandle<()>>,
/// This holds the JoinHandles for all running threads; on shutdown, the thread
/// calling shutdown handles joining on these.
worker_threads: HashMap<usize, thread::JoinHandle<()>>,
/// This is a counter used to iterate worker_threads in a consistent order (for loom's
/// benefit)
worker_thread_index: usize,
}

type Task = task::Notified<NoopSchedule>;
Expand Down Expand Up @@ -105,7 +114,9 @@ impl BlockingPool {
num_notify: 0,
shutdown: false,
shutdown_tx: Some(shutdown_tx),
worker_threads: Slab::new(),
last_exiting_thread: None,
worker_threads: HashMap::new(),
worker_thread_index: 0,
}),
condvar: Condvar::new(),
thread_name: builder.thread_name.clone(),
Expand Down Expand Up @@ -137,12 +148,21 @@ impl BlockingPool {
shared.shutdown = true;
shared.shutdown_tx = None;
self.spawner.inner.condvar.notify_all();
let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());

let last_exited_thread = std::mem::take(&mut shared.last_exiting_thread);
let workers = std::mem::replace(&mut shared.worker_threads, HashMap::new());

drop(shared);

if self.shutdown_rx.wait(timeout) {
for handle in workers.drain() {
let _ = last_exited_thread.map(|th| th.join());

// Loom requires that execution be deterministic, so sort by thread ID before joining.
// (HashMaps use a randomly-seeded hash function, so the order is nondeterministic)
let mut workers: Vec<(usize, thread::JoinHandle<()>)> = workers.into_iter().collect();
workers.sort_by_key(|(id, _)| *id);

for (_id, handle) in workers.into_iter() {
let _ = handle.join();
}
}
Expand Down Expand Up @@ -204,11 +224,13 @@ impl Spawner {

if let Some(shutdown_tx) = shutdown_tx {
let mut shared = self.inner.shared.lock();
let entry = shared.worker_threads.vacant_entry();

let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
let id = shared.worker_thread_index;
shared.worker_thread_index += 1;

entry.insert(handle);
let handle = self.spawn_thread(shutdown_tx, rt, id);

shared.worker_threads.insert(id, handle);
}

Ok(())
Expand All @@ -218,7 +240,7 @@ impl Spawner {
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
worker_id: usize,
id: usize,
) -> thread::JoinHandle<()> {
let mut builder = thread::Builder::new().name((self.inner.thread_name)());

Expand All @@ -232,20 +254,21 @@ impl Spawner {
.spawn(move || {
// Only the reference should be moved into the closure
let _enter = crate::runtime::context::enter(rt.clone());
rt.blocking_spawner.inner.run(worker_id);
rt.blocking_spawner.inner.run(id);
drop(shutdown_tx);
})
.unwrap()
}
}

impl Inner {
fn run(&self, worker_id: usize) {
fn run(&self, worker_thread_id: usize) {
if let Some(f) = &self.after_start {
f()
}

let mut shared = self.shared.lock();
let mut join_on_thread = None;

'main: loop {
// BUSY
Expand Down Expand Up @@ -276,7 +299,11 @@ impl Inner {
// Even if the condvar "timed out", if the pool is entering the
// shutdown phase, we want to perform the cleanup logic.
if !shared.shutdown && timeout_result.timed_out() {
shared.worker_threads.remove(worker_id);
// We'll join the prior timed-out thread's JoinHandle after dropping the lock.
// This isn't done when shutting down, because the thread calling shutdown will
// handle joining everything.
let my_handle = shared.worker_threads.remove(&worker_thread_id);
join_on_thread = std::mem::replace(&mut shared.last_exiting_thread, my_handle);

break 'main;
}
Expand Down Expand Up @@ -323,6 +350,10 @@ impl Inner {
if let Some(f) = &self.before_stop {
f()
}

if let Some(handle) = join_on_thread {
let _ = handle.join();
}
}
}

Expand Down

0 comments on commit d7e3fcb

Please sign in to comment.