Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: internally split Handle into two structs #4629

Merged
merged 2 commits into from Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion tokio/src/runtime/basic_scheduler.rs
Expand Up @@ -5,7 +5,7 @@ use crate::park::{Park, Unpark};
use crate::runtime::context::EnterGuard;
use crate::runtime::driver::Driver;
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::Callback;
use crate::runtime::{Callback, HandleInner};
use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics};
use crate::sync::notify::Notify;
use crate::util::atomic_cell::AtomicCell;
Expand Down Expand Up @@ -78,6 +78,9 @@ struct Shared {
/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

/// Handle to I/O driver, timer, blocking pool, ...
handle_inner: HandleInner,

/// Callback for a worker parking itself
before_park: Option<Callback>,

Expand Down Expand Up @@ -119,6 +122,7 @@ scoped_thread_local!(static CURRENT: Context);
impl BasicScheduler {
pub(crate) fn new(
driver: Driver,
handle_inner: HandleInner,
before_park: Option<Callback>,
after_unpark: Option<Callback>,
) -> BasicScheduler {
Expand All @@ -130,6 +134,7 @@ impl BasicScheduler {
owned: OwnedTasks::new(),
unpark,
woken: AtomicBool::new(false),
handle_inner,
before_park,
after_unpark,
scheduler_metrics: SchedulerMetrics::new(),
Expand Down Expand Up @@ -397,6 +402,10 @@ impl Spawner {
pub(crate) fn reset_woken(&self) -> bool {
self.shared.woken.swap(false, AcqRel)
}

pub(crate) fn as_handle_inner(&self) -> &HandleInner {
&self.shared.handle_inner
}
hawkw marked this conversation as resolved.
Show resolved Hide resolved
}

cfg_metrics! {
Expand Down
25 changes: 0 additions & 25 deletions tokio/src/runtime/blocking/mod.rs
Expand Up @@ -21,28 +21,3 @@ use crate::runtime::Builder;
pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool {
BlockingPool::new(builder, thread_cap)
}

/*
cfg_not_blocking_impl! {
use crate::runtime::Builder;
use std::time::Duration;

#[derive(Debug, Clone)]
pub(crate) struct BlockingPool {}

pub(crate) use BlockingPool as Spawner;

pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool {
BlockingPool {}
}

impl BlockingPool {
pub(crate) fn spawner(&self) -> &BlockingPool {
self
}

pub(crate) fn shutdown(&mut self, _duration: Option<Duration>) {
}
}
}
*/
12 changes: 6 additions & 6 deletions tokio/src/runtime/blocking/pool.rs
Expand Up @@ -7,7 +7,7 @@ use crate::runtime::blocking::shutdown;
use crate::runtime::builder::ThreadNameFn;
use crate::runtime::context;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle};
use crate::runtime::{Builder, Callback, ToHandle};

use std::collections::{HashMap, VecDeque};
use std::fmt;
Expand Down Expand Up @@ -129,7 +129,7 @@ cfg_fs! {
R: Send + 'static,
{
let rt = context::current();
rt.spawn_mandatory_blocking(func)
rt.as_inner().spawn_mandatory_blocking(&rt, func)
}
}

Expand Down Expand Up @@ -220,7 +220,7 @@ impl fmt::Debug for BlockingPool {
// ===== impl Spawner =====

impl Spawner {
pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), ()> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dyn because it isn't really performance critical so 🤷 less code gen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

less codegen is good, but there's only two types that implement this, and a trait object ptr is an extra word on the stack vs a &impl ToHandle...honestly hard to say which is better, i guess. seems fine!

let mut shared = self.inner.shared.lock();

if shared.shutdown {
Expand Down Expand Up @@ -283,7 +283,7 @@ impl Spawner {
fn spawn_thread(
&self,
shutdown_tx: shutdown::Sender,
rt: &Handle,
rt: &dyn ToHandle,
id: usize,
) -> std::io::Result<thread::JoinHandle<()>> {
let mut builder = thread::Builder::new().name((self.inner.thread_name)());
Expand All @@ -292,12 +292,12 @@ impl Spawner {
builder = builder.stack_size(stack_size);
}

let rt = rt.clone();
let rt = rt.to_handle();

builder.spawn(move || {
// Only the reference should be moved into the closure
let _enter = crate::runtime::context::enter(rt.clone());
rt.blocking_spawner.inner.run(id);
rt.as_inner().blocking_spawner.inner.run(id);
drop(shutdown_tx);
})
}
Expand Down
53 changes: 30 additions & 23 deletions tokio/src/runtime/builder.rs
Expand Up @@ -555,32 +555,37 @@ impl Builder {
}

fn build_basic_runtime(&mut self) -> io::Result<Runtime> {
use crate::runtime::{BasicScheduler, Kind};
use crate::runtime::{BasicScheduler, HandleInner, Kind};

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
let blocking_spawner = blocking_pool.spawner().clone();

let handle_inner = HandleInner {
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

// And now put a single-threaded scheduler on top of the timer. When
// there are no futures ready to do something, it'll let the timer or
// the reactor to generate some new stimuli for the futures to continue
// in their life.
let scheduler =
BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone());
let scheduler = BasicScheduler::new(
driver,
handle_inner,
self.before_park.clone(),
self.after_unpark.clone(),
);
let spawner = Spawner::Basic(scheduler.spawner().clone());

// Blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads);
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::CurrentThread(scheduler),
handle: Handle {
spawner,
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
},
handle: Handle { spawner },
blocking_pool,
})
}
Expand Down Expand Up @@ -662,30 +667,32 @@ cfg_rt_multi_thread! {
impl Builder {
fn build_threaded_runtime(&mut self) -> io::Result<Runtime> {
use crate::loom::sys::num_cpus;
use crate::runtime::{Kind, ThreadPool};
use crate::runtime::park::Parker;
use crate::runtime::{Kind, HandleInner, ThreadPool};

let core_threads = self.worker_threads.unwrap_or_else(num_cpus);

let (driver, resources) = driver::Driver::new(self.get_cfg())?;

let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

// Create the blocking pool
let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads);
let blocking_spawner = blocking_pool.spawner().clone();

// Create the runtime handle
let handle = Handle {
spawner,
let handle_inner = HandleInner {
io_handle: resources.io_handle,
time_handle: resources.time_handle,
signal_handle: resources.signal_handle,
clock: resources.clock,
blocking_spawner,
};

let (scheduler, launch) = ThreadPool::new(core_threads, driver, handle_inner, self.before_park.clone(), self.after_unpark.clone());
let spawner = Spawner::ThreadPool(scheduler.spawner().clone());

// Create the runtime handle
let handle = Handle {
spawner,
};

// Spawn the thread pool workers
let _enter = crate::runtime::context::enter(handle.clone());
launch.launch();
Expand Down
8 changes: 4 additions & 4 deletions tokio/src/runtime/context.rs
Expand Up @@ -26,7 +26,7 @@ cfg_io_driver! {
pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).io_handle.clone()
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().io_handle.clone()
}) {
Ok(io_handle) => io_handle,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
Expand All @@ -39,7 +39,7 @@ cfg_signal_internal! {
pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).signal_handle.clone()
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().signal_handle.clone()
}) {
Ok(signal_handle) => signal_handle,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
Expand All @@ -51,7 +51,7 @@ cfg_time! {
pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle {
match CONTEXT.try_with(|ctx| {
let ctx = ctx.borrow();
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).time_handle.clone()
ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().time_handle.clone()
}) {
Ok(time_handle) => time_handle,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
Expand All @@ -60,7 +60,7 @@ cfg_time! {

cfg_test_util! {
pub(crate) fn clock() -> Option<crate::runtime::driver::Clock> {
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone())) {
match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.as_inner().clock.clone())) {
Ok(clock) => clock,
Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR),
}
Expand Down