From 2dcde0f6c47b62f77d513f6849fb56d657df55ba Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 20 Apr 2022 10:08:11 -0700 Subject: [PATCH 1/2] rt: internally split Handle into two structs Previously, `runtime::Handle` was a single struct composed of the internal handles for each runtime component. This patch splits the `Handle` struct into a `HandleInner` which contains everything **except** the task scheduler handle. Now, `HandleInner` is passed to the task scheduler during creation and the task scheduler is responsible for storing it. `Handle` only needs to hold the scheduler handle and can access the rest of the component handles by querying the task scheduler. The motivation for this change is it now enables the multi-threaded scheduler to have direct access to the blocking spawner handle. Previously, when spawning a new thread, the multi-threaded scheduler had to access the blocking spawner by accessing a thread-local variable. Now, in theory, the multi-threaded scheduler can use `HandleInner` directly. However, this change hasn't been done in this PR yet. Also, now the `Handle` struct is much smaller. This change is intended to make it easier for the multi-threaded scheduler to shutdown idle threads and respawn them on demand. --- tokio/src/runtime/basic_scheduler.rs | 11 +- tokio/src/runtime/blocking/mod.rs | 25 --- tokio/src/runtime/blocking/pool.rs | 12 +- tokio/src/runtime/builder.rs | 53 +++--- tokio/src/runtime/context.rs | 8 +- tokio/src/runtime/handle.rs | 188 +++++++++++-------- tokio/src/runtime/mod.rs | 11 +- tokio/src/runtime/spawner.rs | 10 +- tokio/src/runtime/tests/loom_queue.rs | 3 +- tokio/src/runtime/tests/queue.rs | 2 +- tokio/src/runtime/thread_pool/mod.rs | 18 +- tokio/src/runtime/{ => thread_pool}/park.rs | 0 tokio/src/runtime/{ => thread_pool}/queue.rs | 20 +- tokio/src/runtime/thread_pool/worker.rs | 27 ++- 14 files changed, 225 insertions(+), 163 deletions(-) rename tokio/src/runtime/{ => thread_pool}/park.rs (100%) rename tokio/src/runtime/{ => thread_pool}/queue.rs (97%) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 401f55b3f2f..acebd0ab480 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -5,7 +5,7 @@ use crate::park::{Park, Unpark}; use crate::runtime::context::EnterGuard; use crate::runtime::driver::Driver; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; -use crate::runtime::Callback; +use crate::runtime::{Callback, HandleInner}; use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::sync::notify::Notify; use crate::util::atomic_cell::AtomicCell; @@ -78,6 +78,9 @@ struct Shared { /// Indicates whether the blocked on thread was woken. woken: AtomicBool, + /// Handle to I/O driver, timer, blocking pool, ... + handle_inner: HandleInner, + /// Callback for a worker parking itself before_park: Option, @@ -119,6 +122,7 @@ scoped_thread_local!(static CURRENT: Context); impl BasicScheduler { pub(crate) fn new( driver: Driver, + handle_inner: HandleInner, before_park: Option, after_unpark: Option, ) -> BasicScheduler { @@ -130,6 +134,7 @@ impl BasicScheduler { owned: OwnedTasks::new(), unpark, woken: AtomicBool::new(false), + handle_inner, before_park, after_unpark, scheduler_metrics: SchedulerMetrics::new(), @@ -397,6 +402,10 @@ impl Spawner { pub(crate) fn reset_woken(&self) -> bool { self.shared.woken.swap(false, AcqRel) } + + pub(crate) fn as_handle_inner(&self) -> &HandleInner { + &self.shared.handle_inner + } } cfg_metrics! { diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 15fe05c9ade..88d5e6b6a99 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -21,28 +21,3 @@ use crate::runtime::Builder; pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { BlockingPool::new(builder, thread_cap) } - -/* -cfg_not_blocking_impl! { - use crate::runtime::Builder; - use std::time::Duration; - - #[derive(Debug, Clone)] - pub(crate) struct BlockingPool {} - - pub(crate) use BlockingPool as Spawner; - - pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool { - BlockingPool {} - } - - impl BlockingPool { - pub(crate) fn spawner(&self) -> &BlockingPool { - self - } - - pub(crate) fn shutdown(&mut self, _duration: Option) { - } - } -} -*/ diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index e47073ce2d9..f73868ee9e7 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -7,7 +7,7 @@ use crate::runtime::blocking::shutdown; use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; -use crate::runtime::{Builder, Callback, Handle}; +use crate::runtime::{Builder, Callback, ToHandle}; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -129,7 +129,7 @@ cfg_fs! { R: Send + 'static, { let rt = context::current(); - rt.spawn_mandatory_blocking(func) + rt.as_inner().spawn_mandatory_blocking(&rt, func) } } @@ -220,7 +220,7 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== impl Spawner { - pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { + pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), ()> { let mut shared = self.inner.shared.lock(); if shared.shutdown { @@ -283,7 +283,7 @@ impl Spawner { fn spawn_thread( &self, shutdown_tx: shutdown::Sender, - rt: &Handle, + rt: &dyn ToHandle, id: usize, ) -> std::io::Result> { let mut builder = thread::Builder::new().name((self.inner.thread_name)()); @@ -292,12 +292,12 @@ impl Spawner { builder = builder.stack_size(stack_size); } - let rt = rt.clone(); + let rt = rt.to_handle(); builder.spawn(move || { // Only the reference should be moved into the closure let _enter = crate::runtime::context::enter(rt.clone()); - rt.blocking_spawner.inner.run(id); + rt.as_inner().blocking_spawner.inner.run(id); drop(shutdown_tx); }) } diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 91c365fd516..618474c05ce 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -555,32 +555,37 @@ impl Builder { } fn build_basic_runtime(&mut self) -> io::Result { - use crate::runtime::{BasicScheduler, Kind}; + use crate::runtime::{BasicScheduler, HandleInner, Kind}; let (driver, resources) = driver::Driver::new(self.get_cfg())?; + // Blocking pool + let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); + let blocking_spawner = blocking_pool.spawner().clone(); + + let handle_inner = HandleInner { + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, + blocking_spawner, + }; + // And now put a single-threaded scheduler on top of the timer. When // there are no futures ready to do something, it'll let the timer or // the reactor to generate some new stimuli for the futures to continue // in their life. - let scheduler = - BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone()); + let scheduler = BasicScheduler::new( + driver, + handle_inner, + self.before_park.clone(), + self.after_unpark.clone(), + ); let spawner = Spawner::Basic(scheduler.spawner().clone()); - // Blocking pool - let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); - let blocking_spawner = blocking_pool.spawner().clone(); - Ok(Runtime { kind: Kind::CurrentThread(scheduler), - handle: Handle { - spawner, - io_handle: resources.io_handle, - time_handle: resources.time_handle, - signal_handle: resources.signal_handle, - clock: resources.clock, - blocking_spawner, - }, + handle: Handle { spawner }, blocking_pool, }) } @@ -662,23 +667,17 @@ cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { use crate::loom::sys::num_cpus; - use crate::runtime::{Kind, ThreadPool}; - use crate::runtime::park::Parker; + use crate::runtime::{Kind, HandleInner, ThreadPool}; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); let (driver, resources) = driver::Driver::new(self.get_cfg())?; - let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone()); - let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); - // Create the blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); let blocking_spawner = blocking_pool.spawner().clone(); - // Create the runtime handle - let handle = Handle { - spawner, + let handle_inner = HandleInner { io_handle: resources.io_handle, time_handle: resources.time_handle, signal_handle: resources.signal_handle, @@ -686,6 +685,14 @@ cfg_rt_multi_thread! { blocking_spawner, }; + let (scheduler, launch) = ThreadPool::new(core_threads, driver, handle_inner, self.before_park.clone(), self.after_unpark.clone()); + let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); + + // Create the runtime handle + let handle = Handle { + spawner, + }; + // Spawn the thread pool workers let _enter = crate::runtime::context::enter(handle.clone()); launch.launch(); diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 1f44a534026..aebbe18755a 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -26,7 +26,7 @@ cfg_io_driver! { pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).io_handle.clone() + ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().io_handle.clone() }) { Ok(io_handle) => io_handle, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), @@ -39,7 +39,7 @@ cfg_signal_internal! { pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).signal_handle.clone() + ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().signal_handle.clone() }) { Ok(signal_handle) => signal_handle, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), @@ -51,7 +51,7 @@ cfg_time! { pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle { match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).time_handle.clone() + ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().time_handle.clone() }) { Ok(time_handle) => time_handle, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), @@ -60,7 +60,7 @@ cfg_time! { cfg_test_util! { pub(crate) fn clock() -> Option { - match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone())) { + match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.as_inner().clock.clone())) { Ok(clock) => clock, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 3365ab73b2a..180c3ab859e 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -16,7 +16,11 @@ use std::{error, fmt}; #[derive(Debug, Clone)] pub struct Handle { pub(super) spawner: Spawner, +} +/// All internal handles that are *not* the scheduler's spawner. +#[derive(Debug)] +pub(crate) struct HandleInner { /// Handles to the I/O drivers #[cfg_attr( not(any(feature = "net", feature = "process", all(unix, feature = "signal"))), @@ -47,6 +51,11 @@ pub struct Handle { pub(super) blocking_spawner: blocking::Spawner, } +/// Create a new runtime handle. +pub(crate) trait ToHandle { + fn to_handle(&self) -> Handle; +} + /// Runtime context guard. /// /// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits @@ -196,85 +205,11 @@ impl Handle { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, _was_spawned) = - if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None) - } else { - self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None) - }; - - join_handle + self.as_inner().spawn_blocking(self, func) } - cfg_fs! { - #[track_caller] - #[cfg_attr(any( - all(loom, not(test)), // the function is covered by loom tests - test - ), allow(dead_code))] - pub(crate) fn spawn_mandatory_blocking(&self, func: F) -> Option> - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - self.spawn_blocking_inner( - Box::new(func), - blocking::Mandatory::Mandatory, - None - ) - } else { - self.spawn_blocking_inner( - func, - blocking::Mandatory::Mandatory, - None - ) - }; - - if was_spawned { - Some(join_handle) - } else { - None - } - } - } - - #[track_caller] - pub(crate) fn spawn_blocking_inner( - &self, - func: F, - is_mandatory: blocking::Mandatory, - name: Option<&str>, - ) -> (JoinHandle, bool) - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - let fut = BlockingTask::new(func); - - #[cfg(all(tokio_unstable, feature = "tracing"))] - let fut = { - use tracing::Instrument; - let location = std::panic::Location::caller(); - let span = tracing::trace_span!( - target: "tokio::task::blocking", - "runtime.spawn", - kind = %"blocking", - task.name = %name.unwrap_or_default(), - "fn" = %std::any::type_name::(), - spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), - ); - fut.instrument(span) - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let _ = name; - - let (task, handle) = task::unowned(fut, NoopSchedule); - let spawned = self - .blocking_spawner - .spawn(blocking::Task::new(task, is_mandatory), self); - (handle, spawned.is_ok()) + pub(crate) fn as_inner(&self) -> &HandleInner { + self.spawner.as_handle_inner() } /// Runs a future to completion on this `Handle`'s associated `Runtime`. @@ -369,6 +304,12 @@ impl Handle { } } +impl ToHandle for Handle { + fn to_handle(&self) -> Handle { + self.clone() + } +} + cfg_metrics! { use crate::runtime::RuntimeMetrics; @@ -381,6 +322,99 @@ cfg_metrics! { } } +impl HandleInner { + #[track_caller] + pub(crate) fn spawn_blocking(&self, rt: &dyn ToHandle, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (join_handle, _was_spawned) = if cfg!(debug_assertions) + && std::mem::size_of::() > 2048 + { + self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None, rt) + } else { + self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None, rt) + }; + + join_handle + } + + cfg_fs! { + #[track_caller] + #[cfg_attr(any( + all(loom, not(test)), // the function is covered by loom tests + test + ), allow(dead_code))] + pub(crate) fn spawn_mandatory_blocking(&self, rt: &dyn ToHandle, func: F) -> Option> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { + self.spawn_blocking_inner( + Box::new(func), + blocking::Mandatory::Mandatory, + None, + rt, + ) + } else { + self.spawn_blocking_inner( + func, + blocking::Mandatory::Mandatory, + None, + rt, + ) + }; + + if was_spawned { + Some(join_handle) + } else { + None + } + } + } + + #[track_caller] + pub(crate) fn spawn_blocking_inner( + &self, + func: F, + is_mandatory: blocking::Mandatory, + name: Option<&str>, + rt: &dyn ToHandle, + ) -> (JoinHandle, bool) + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let fut = BlockingTask::new(func); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let fut = { + use tracing::Instrument; + let location = std::panic::Location::caller(); + let span = tracing::trace_span!( + target: "tokio::task::blocking", + "runtime.spawn", + kind = %"blocking", + task.name = %name.unwrap_or_default(), + "fn" = %std::any::type_name::(), + spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), + ); + fut.instrument(span) + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let _ = name; + + let (task, handle) = task::unowned(fut, NoopSchedule); + let spawned = self + .blocking_spawner + .spawn(blocking::Task::new(task, is_mandatory), rt); + (handle, spawned.is_ok()) + } +} + /// Error returned by `try_current` when no Runtime has been started #[derive(Debug)] pub struct TryCurrentError { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 66856df66c7..979e27f3b1c 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -218,25 +218,20 @@ cfg_rt! { pub use self::builder::Builder; pub(crate) mod context; - pub(crate) mod driver; + mod driver; + use driver::Driver; use self::enter::enter; mod handle; pub use handle::{EnterGuard, Handle, TryCurrentError}; + pub(crate) use handle::{HandleInner, ToHandle}; mod spawner; use self::spawner::Spawner; } cfg_rt_multi_thread! { - mod park; - use park::Parker; -} - -cfg_rt_multi_thread! { - mod queue; - pub(crate) mod thread_pool; use self::thread_pool::ThreadPool; } diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs index d81a806cb59..1dba8e3cef5 100644 --- a/tokio/src/runtime/spawner.rs +++ b/tokio/src/runtime/spawner.rs @@ -1,5 +1,5 @@ use crate::future::Future; -use crate::runtime::basic_scheduler; +use crate::runtime::{basic_scheduler, HandleInner}; use crate::task::JoinHandle; cfg_rt_multi_thread! { @@ -34,6 +34,14 @@ impl Spawner { Spawner::ThreadPool(spawner) => spawner.spawn(future), } } + + pub(crate) fn as_handle_inner(&self) -> &HandleInner { + match self { + Spawner::Basic(spawner) => spawner.as_handle_inner(), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.as_handle_inner(), + } + } } cfg_metrics! { diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index b5f78d7ebe7..d0ebf5d4350 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,6 +1,7 @@ use crate::runtime::blocking::NoopSchedule; use crate::runtime::task::Inject; -use crate::runtime::{queue, MetricsBatch}; +use crate::runtime::thread_pool::queue; +use crate::runtime::MetricsBatch; use loom::thread; diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 0fd1e0c6d9e..2bdaecf9f7c 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,5 +1,5 @@ -use crate::runtime::queue; use crate::runtime::task::{self, Inject, Schedule, Task}; +use crate::runtime::thread_pool::queue; use crate::runtime::MetricsBatch; use std::thread; diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index d3f46517cb0..76346c686e7 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -3,6 +3,11 @@ mod idle; use self::idle::Idle; +mod park; +pub(crate) use park::{Parker, Unparker}; + +pub(super) mod queue; + mod worker; pub(crate) use worker::Launch; @@ -10,7 +15,7 @@ pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; use crate::runtime::task::JoinHandle; -use crate::runtime::{Callback, Parker}; +use crate::runtime::{Callback, Driver, HandleInner}; use std::fmt; use std::future::Future; @@ -42,11 +47,14 @@ pub(crate) struct Spawner { impl ThreadPool { pub(crate) fn new( size: usize, - parker: Parker, + driver: Driver, + handle_inner: HandleInner, before_park: Option, after_unpark: Option, ) -> (ThreadPool, Launch) { - let (shared, launch) = worker::create(size, parker, before_park, after_unpark); + let parker = Parker::new(driver); + let (shared, launch) = + worker::create(size, parker, handle_inner, before_park, after_unpark); let spawner = Spawner { shared }; let thread_pool = ThreadPool { spawner }; @@ -101,6 +109,10 @@ impl Spawner { pub(crate) fn shutdown(&mut self) { self.shared.close(); } + + pub(crate) fn as_handle_inner(&self) -> &HandleInner { + self.shared.as_handle_inner() + } } cfg_metrics! { diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/thread_pool/park.rs similarity index 100% rename from tokio/src/runtime/park.rs rename to tokio/src/runtime/thread_pool/park.rs diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/thread_pool/queue.rs similarity index 97% rename from tokio/src/runtime/queue.rs rename to tokio/src/runtime/thread_pool/queue.rs index ad9085a6545..1f5841d6dda 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/thread_pool/queue.rs @@ -11,14 +11,14 @@ use std::ptr; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; /// Producer handle. May only be used from a single thread. -pub(super) struct Local { +pub(crate) struct Local { inner: Arc>, } /// Consumer handle. May be used from many threads. -pub(super) struct Steal(Arc>); +pub(crate) struct Steal(Arc>); -pub(super) struct Inner { +pub(crate) struct Inner { /// Concurrently updated by many threads. /// /// Contains two `u16` values. The LSB byte is the "real" head of the queue. @@ -65,7 +65,7 @@ fn make_fixed_size(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> { } /// Create a new local run-queue -pub(super) fn local() -> (Steal, Local) { +pub(crate) fn local() -> (Steal, Local) { let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); for _ in 0..LOCAL_QUEUE_CAPACITY { @@ -89,7 +89,7 @@ pub(super) fn local() -> (Steal, Local) { impl Local { /// Returns true if the queue has entries that can be stealed. - pub(super) fn is_stealable(&self) -> bool { + pub(crate) fn is_stealable(&self) -> bool { !self.inner.is_empty() } @@ -97,12 +97,12 @@ impl Local { /// /// Separate to is_stealable so that refactors of is_stealable to "protect" /// some tasks from stealing won't affect this - pub(super) fn has_tasks(&self) -> bool { + pub(crate) fn has_tasks(&self) -> bool { !self.inner.is_empty() } /// Pushes a task to the back of the local queue, skipping the LIFO slot. - pub(super) fn push_back( + pub(crate) fn push_back( &mut self, mut task: task::Notified, inject: &Inject, @@ -259,7 +259,7 @@ impl Local { } /// Pops a task from the local queue. - pub(super) fn pop(&mut self) -> Option> { + pub(crate) fn pop(&mut self) -> Option> { let mut head = self.inner.head.load(Acquire); let idx = loop { @@ -301,12 +301,12 @@ impl Local { } impl Steal { - pub(super) fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { self.0.is_empty() } /// Steals half the tasks from self and place them into `dst`. - pub(super) fn steal_into( + pub(crate) fn steal_into( &self, dst: &mut Local, dst_metrics: &mut MetricsBatch, diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index a3893177c49..7668bb7cab4 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -63,10 +63,9 @@ use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::enter::EnterContext; -use crate::runtime::park::{Parker, Unparker}; use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; -use crate::runtime::thread_pool::Idle; -use crate::runtime::{queue, task, Callback, MetricsBatch, SchedulerMetrics, WorkerMetrics}; +use crate::runtime::thread_pool::{queue, Idle, Parker, Unparker}; +use crate::runtime::{task, Callback, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::util::atomic_cell::AtomicCell; use crate::util::FastRand; @@ -122,6 +121,9 @@ struct Core { /// State shared across all workers pub(super) struct Shared { + /// Handle to the I/O driver, timer, blocking spawner, ... + handle_inner: HandleInner, + /// Per-worker remote state. All other workers have access to this and is /// how they communicate between each other. remotes: Box<[Remote]>, @@ -193,6 +195,7 @@ scoped_thread_local!(static CURRENT: Context); pub(super) fn create( size: usize, park: Parker, + handle_inner: HandleInner, before_park: Option, after_unpark: Option, ) -> (Arc, Launch) { @@ -223,6 +226,7 @@ pub(super) fn create( } let shared = Arc::new(Shared { + handle_inner, remotes: remotes.into_boxed_slice(), inject: Inject::new(), idle: Idle::new(size), @@ -715,6 +719,10 @@ impl task::Schedule for Arc { } impl Shared { + pub(crate) fn as_handle_inner(&self) -> &HandleInner { + &self.handle_inner + } + pub(super) fn bind_new_task(me: &Arc, future: T) -> JoinHandle where T: Future + Send + 'static, @@ -853,6 +861,19 @@ impl Shared { } } +impl crate::runtime::ToHandle for Arc { + fn to_handle(&self) -> crate::runtime::Handle { + use crate::runtime::thread_pool::Spawner; + use crate::runtime::{self, Handle}; + + Handle { + spawner: runtime::Spawner::ThreadPool(Spawner { + shared: self.clone(), + }), + } + } +} + cfg_metrics! { impl Shared { pub(super) fn injection_queue_depth(&self) -> usize { From 15408274f64e563f92d5c261d44619b303aa0a21 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 20 Apr 2022 10:31:33 -0700 Subject: [PATCH 2/2] fix ci --- tokio/src/runtime/metrics/runtime.rs | 1 + tokio/src/runtime/mod.rs | 3 ++- tokio/src/task/builder.rs | 9 +++++++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index a2bec714c0c..1acf14ba181 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -526,6 +526,7 @@ cfg_net! { // TODO: Investigate if this should return 0, most of our metrics always increase // thus this breaks that guarantee. self.handle + .as_inner() .io_handle .as_ref() .and_then(|h| h.with_io_driver_metrics(f)) diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 979e27f3b1c..d7f54360236 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -219,7 +219,6 @@ cfg_rt! { pub(crate) mod context; mod driver; - use driver::Driver; use self::enter::enter; @@ -232,6 +231,8 @@ cfg_rt! { } cfg_rt_multi_thread! { + use driver::Driver; + pub(crate) mod thread_pool; use self::thread_pool::ThreadPool; } diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 2086302fb92..976ecc3c4b0 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -108,8 +108,13 @@ impl<'a> Builder<'a> { Output: Send + 'static, { use crate::runtime::Mandatory; - let (join_handle, _was_spawned) = - context::current().spawn_blocking_inner(function, Mandatory::NonMandatory, self.name); + let handle = context::current(); + let (join_handle, _was_spawned) = handle.as_inner().spawn_blocking_inner( + function, + Mandatory::NonMandatory, + self.name, + &handle, + ); join_handle } }