Skip to content

Commit

Permalink
rt: internally split Handle into two structs (#4629)
Browse files Browse the repository at this point in the history
Previously, `runtime::Handle` was a single struct composed of the
internal handles for each runtime component. This patch splits the
`Handle` struct into a `HandleInner` which contains everything
**except** the task scheduler handle. Now, `HandleInner` is passed to
the task scheduler during creation and the task scheduler is responsible
for storing it. `Handle` only  needs to hold the scheduler handle and
can access the rest of the component handles by querying the task
scheduler.

The motivation for this change is it now enables the multi-threaded
scheduler to have direct access to the blocking spawner handle.
Previously, when spawning a new thread, the multi-threaded scheduler had
to access the blocking spawner by accessing a thread-local variable.
Now, in theory, the multi-threaded scheduler can use `HandleInner`
directly. However, this change hasn't been done in this PR yet.

Also, now the `Handle` struct is much smaller.

This change is intended to make it easier for the multi-threaded
scheduler to shutdown idle threads and respawn them on demand.
  • Loading branch information
carllerche committed Apr 20, 2022
1 parent d590a36 commit 911a0ef
Show file tree
Hide file tree
Showing 16 changed files with 233 additions and 164 deletions.
11 changes: 10 additions & 1 deletion tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -5,7 +5,7 @@ use crate::park::{Park, Unpark};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::Driver;
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::Callback;
use crate::runtime::{Callback, HandleInner};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
Expand Down Expand Up @@ -78,6 +78,9 @@ struct Shared {
/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

/// Handle to I/O driver, timer, blocking pool, ...
handle_inner: HandleInner,

/// Callback for a worker parking itself
before_park: Option<Callback>,

Expand Down Expand Up @@ -119,6 +122,7 @@ scoped_thread_local!(static CURRENT: Context);
impl BasicScheduler {
pub(crate) fn new(
driver: Driver,
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
) -> BasicScheduler {
Expand All @@ -130,6 +134,7 @@ impl BasicScheduler {
owned: OwnedTasks::new(),
unpark,
woken: AtomicBool::new(false),
handle_inner,
before_park,
after_unpark,
scheduler_metrics: SchedulerMetrics::new(),
Expand Down Expand Up @@ -397,6 +402,10 @@ impl Spawner {
pub(crate) fn reset_woken(&self) -> bool {
self.shared.woken.swap(false, AcqRel)
}

pub(crate) fn as_handle_inner(&self) -> &HandleInner {
&self.shared.handle_inner
}
}

cfg_metrics! {
Expand Down
25 changes: 0 additions & 25 deletions tokio/src/runtime/blocking/mod.rs
Expand Up @@ -21,28 +21,3 @@ use crate::runtime::Builder;
pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
BlockingPool::new(builder, thread_cap)
}

/*
cfg_not_blocking_impl! {
use crate::runtime::Builder;
use std::time::Duration;
#[derive(Debug, Clone)]
pub(crate) struct BlockingPool {}
pub(crate) use BlockingPool as Spawner;
pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool {
BlockingPool {}
}
impl BlockingPool {
pub(crate) fn spawner(&self) -> &BlockingPool {
self
}
pub(crate) fn shutdown(&mut self, _duration: Option<Duration>) {
}
}
}
*/
12 changes: 6 additions & 6 deletions tokio/src/runtime/blocking/pool.rs
Expand Up @@ -7,7 +7,7 @@ use crate::runtime::blocking::shutdown;
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
use crate::runtime::{Builder, Callback, ToHandle};

use std::collections::{HashMap, VecDeque};
use std::fmt;
Expand Down Expand Up @@ -129,7 +129,7 @@ cfg_fs! {
R: Send + 'static,
{
let rt = context::current();
rt.spawn_mandatory_blocking(func)
rt.as_inner().spawn_mandatory_blocking(&rt, func)
}
}

Expand Down Expand Up @@ -220,7 +220,7 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner =====

impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), ()> {
let mut shared = self.inner.shared.lock();

if shared.shutdown {
Expand Down Expand Up @@ -283,7 +283,7 @@ impl Spawner {
fn spawn_thread(
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
rt: &dyn ToHandle,
id: usize,
) -> std::io::Result<thread::JoinHandle<()>> {
let mut builder = thread::Builder::new().name((self.inner.thread_name)());
Expand All @@ -292,12 +292,12 @@ impl Spawner {
builder = builder.stack_size(stack_size);
}

let rt = rt.clone();
let rt = rt.to_handle();

builder.spawn(move || {
// Only the reference should be moved into the closure
let _enter = crate::runtime::context::enter(rt.clone());
rt.blocking_spawner.inner.run(id);
rt.as_inner().blocking_spawner.inner.run(id);
drop(shutdown_tx);
})
}
Expand Down
53 changes: 30 additions & 23 deletions tokio/src/runtime/builder.rs
Expand Up @@ -555,32 +555,37 @@ impl Builder {
}

fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Kind};
use crate::runtime::{BasicScheduler, HandleInner, Kind};

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
let blocking_spawner = blocking_pool.spawner().clone();

let handle_inner = HandleInner {
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler =
BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone());
let scheduler = BasicScheduler::new(
driver,
handle_inner,
self.before_park.clone(),
self.after_unpark.clone(),
);
let spawner = Spawner::Basic(scheduler.spawner().clone());

// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::CurrentThread(scheduler),
handle: Handle {
spawner,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
},
handle: Handle { spawner },
blocking_pool,
})
}
Expand Down Expand Up @@ -662,30 +667,32 @@ cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Kind, ThreadPool};
use crate::runtime::park::Parker;
use crate::runtime::{Kind, HandleInner, ThreadPool};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

// Create the blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

// Create the runtime handle
let handle = Handle {
spawner,
let handle_inner = HandleInner {
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

let (scheduler, launch) = ThreadPool::new(core_threads, driver, handle_inner, self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

// Create the runtime handle
let handle = Handle {
spawner,
};

// Spawn the thread pool workers
let _enter = crate::runtime::context::enter(handle.clone());
launch.launch();
Expand Down
8 changes: 4 additions & 4 deletions tokio/src/runtime/context.rs
Expand Up @@ -26,7 +26,7 @@ cfg_io_driver! {
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).io_handle.clone()
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().io_handle.clone()
}) {
Ok(io_handle) => io_handle,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
Expand All @@ -39,7 +39,7 @@ cfg_signal_internal! {
pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).signal_handle.clone()
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().signal_handle.clone()
}) {
Ok(signal_handle) => signal_handle,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
Expand All @@ -51,7 +51,7 @@ cfg_time! {
pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).time_handle.clone()
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().time_handle.clone()
}) {
Ok(time_handle) => time_handle,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
Expand All @@ -60,7 +60,7 @@ cfg_time! {

cfg_test_util! {
pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone())) {
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.as_inner().clock.clone())) {
Ok(clock) => clock,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
Expand Down

0 comments on commit 911a0ef

Please sign in to comment.