diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 401f55b3f2f..acebd0ab480 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -5,7 +5,7 @@ use crate::park::{Park, Unpark}; use crate::runtime::context::EnterGuard; use crate::runtime::driver::Driver; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; -use crate::runtime::Callback; +use crate::runtime::{Callback, HandleInner}; use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::sync::notify::Notify; use crate::util::atomic_cell::AtomicCell; @@ -78,6 +78,9 @@ struct Shared { /// Indicates whether the blocked on thread was woken. woken: AtomicBool, + /// Handle to I/O driver, timer, blocking pool, ... + handle_inner: HandleInner, + /// Callback for a worker parking itself before_park: Option, @@ -119,6 +122,7 @@ scoped_thread_local!(static CURRENT: Context); impl BasicScheduler { pub(crate) fn new( driver: Driver, + handle_inner: HandleInner, before_park: Option, after_unpark: Option, ) -> BasicScheduler { @@ -130,6 +134,7 @@ impl BasicScheduler { owned: OwnedTasks::new(), unpark, woken: AtomicBool::new(false), + handle_inner, before_park, after_unpark, scheduler_metrics: SchedulerMetrics::new(), @@ -397,6 +402,10 @@ impl Spawner { pub(crate) fn reset_woken(&self) -> bool { self.shared.woken.swap(false, AcqRel) } + + pub(crate) fn as_handle_inner(&self) -> &HandleInner { + &self.shared.handle_inner + } } cfg_metrics! { diff --git a/tokio/src/runtime/blocking/mod.rs b/tokio/src/runtime/blocking/mod.rs index 15fe05c9ade..88d5e6b6a99 100644 --- a/tokio/src/runtime/blocking/mod.rs +++ b/tokio/src/runtime/blocking/mod.rs @@ -21,28 +21,3 @@ use crate::runtime::Builder; pub(crate) fn create_blocking_pool(builder: &Builder, thread_cap: usize) -> BlockingPool { BlockingPool::new(builder, thread_cap) } - -/* -cfg_not_blocking_impl! { - use crate::runtime::Builder; - use std::time::Duration; - - #[derive(Debug, Clone)] - pub(crate) struct BlockingPool {} - - pub(crate) use BlockingPool as Spawner; - - pub(crate) fn create_blocking_pool(_builder: &Builder, _thread_cap: usize) -> BlockingPool { - BlockingPool {} - } - - impl BlockingPool { - pub(crate) fn spawner(&self) -> &BlockingPool { - self - } - - pub(crate) fn shutdown(&mut self, _duration: Option) { - } - } -} -*/ diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index e47073ce2d9..f73868ee9e7 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -7,7 +7,7 @@ use crate::runtime::blocking::shutdown; use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; -use crate::runtime::{Builder, Callback, Handle}; +use crate::runtime::{Builder, Callback, ToHandle}; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -129,7 +129,7 @@ cfg_fs! { R: Send + 'static, { let rt = context::current(); - rt.spawn_mandatory_blocking(func) + rt.as_inner().spawn_mandatory_blocking(&rt, func) } } @@ -220,7 +220,7 @@ impl fmt::Debug for BlockingPool { // ===== impl Spawner ===== impl Spawner { - pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> { + pub(crate) fn spawn(&self, task: Task, rt: &dyn ToHandle) -> Result<(), ()> { let mut shared = self.inner.shared.lock(); if shared.shutdown { @@ -283,7 +283,7 @@ impl Spawner { fn spawn_thread( &self, shutdown_tx: shutdown::Sender, - rt: &Handle, + rt: &dyn ToHandle, id: usize, ) -> std::io::Result> { let mut builder = thread::Builder::new().name((self.inner.thread_name)()); @@ -292,12 +292,12 @@ impl Spawner { builder = builder.stack_size(stack_size); } - let rt = rt.clone(); + let rt = rt.to_handle(); builder.spawn(move || { // Only the reference should be moved into the closure let _enter = crate::runtime::context::enter(rt.clone()); - rt.blocking_spawner.inner.run(id); + rt.as_inner().blocking_spawner.inner.run(id); drop(shutdown_tx); }) } diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 91c365fd516..618474c05ce 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -555,32 +555,37 @@ impl Builder { } fn build_basic_runtime(&mut self) -> io::Result { - use crate::runtime::{BasicScheduler, Kind}; + use crate::runtime::{BasicScheduler, HandleInner, Kind}; let (driver, resources) = driver::Driver::new(self.get_cfg())?; + // Blocking pool + let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); + let blocking_spawner = blocking_pool.spawner().clone(); + + let handle_inner = HandleInner { + io_handle: resources.io_handle, + time_handle: resources.time_handle, + signal_handle: resources.signal_handle, + clock: resources.clock, + blocking_spawner, + }; + // And now put a single-threaded scheduler on top of the timer. When // there are no futures ready to do something, it'll let the timer or // the reactor to generate some new stimuli for the futures to continue // in their life. - let scheduler = - BasicScheduler::new(driver, self.before_park.clone(), self.after_unpark.clone()); + let scheduler = BasicScheduler::new( + driver, + handle_inner, + self.before_park.clone(), + self.after_unpark.clone(), + ); let spawner = Spawner::Basic(scheduler.spawner().clone()); - // Blocking pool - let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); - let blocking_spawner = blocking_pool.spawner().clone(); - Ok(Runtime { kind: Kind::CurrentThread(scheduler), - handle: Handle { - spawner, - io_handle: resources.io_handle, - time_handle: resources.time_handle, - signal_handle: resources.signal_handle, - clock: resources.clock, - blocking_spawner, - }, + handle: Handle { spawner }, blocking_pool, }) } @@ -662,23 +667,17 @@ cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { use crate::loom::sys::num_cpus; - use crate::runtime::{Kind, ThreadPool}; - use crate::runtime::park::Parker; + use crate::runtime::{Kind, HandleInner, ThreadPool}; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); let (driver, resources) = driver::Driver::new(self.get_cfg())?; - let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver), self.before_park.clone(), self.after_unpark.clone()); - let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); - // Create the blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); let blocking_spawner = blocking_pool.spawner().clone(); - // Create the runtime handle - let handle = Handle { - spawner, + let handle_inner = HandleInner { io_handle: resources.io_handle, time_handle: resources.time_handle, signal_handle: resources.signal_handle, @@ -686,6 +685,14 @@ cfg_rt_multi_thread! { blocking_spawner, }; + let (scheduler, launch) = ThreadPool::new(core_threads, driver, handle_inner, self.before_park.clone(), self.after_unpark.clone()); + let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); + + // Create the runtime handle + let handle = Handle { + spawner, + }; + // Spawn the thread pool workers let _enter = crate::runtime::context::enter(handle.clone()); launch.launch(); diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 1f44a534026..aebbe18755a 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -26,7 +26,7 @@ cfg_io_driver! { pub(crate) fn io_handle() -> crate::runtime::driver::IoHandle { match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).io_handle.clone() + ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().io_handle.clone() }) { Ok(io_handle) => io_handle, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), @@ -39,7 +39,7 @@ cfg_signal_internal! { pub(crate) fn signal_handle() -> crate::runtime::driver::SignalHandle { match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).signal_handle.clone() + ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().signal_handle.clone() }) { Ok(signal_handle) => signal_handle, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), @@ -51,7 +51,7 @@ cfg_time! { pub(crate) fn time_handle() -> crate::runtime::driver::TimeHandle { match CONTEXT.try_with(|ctx| { let ctx = ctx.borrow(); - ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).time_handle.clone() + ctx.as_ref().expect(crate::util::error::CONTEXT_MISSING_ERROR).as_inner().time_handle.clone() }) { Ok(time_handle) => time_handle, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), @@ -60,7 +60,7 @@ cfg_time! { cfg_test_util! { pub(crate) fn clock() -> Option { - match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.clock.clone())) { + match CONTEXT.try_with(|ctx| (*ctx.borrow()).as_ref().map(|ctx| ctx.as_inner().clock.clone())) { Ok(clock) => clock, Err(_) => panic!("{}", crate::util::error::THREAD_LOCAL_DESTROYED_ERROR), } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 3365ab73b2a..180c3ab859e 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -16,7 +16,11 @@ use std::{error, fmt}; #[derive(Debug, Clone)] pub struct Handle { pub(super) spawner: Spawner, +} +/// All internal handles that are *not* the scheduler's spawner. +#[derive(Debug)] +pub(crate) struct HandleInner { /// Handles to the I/O drivers #[cfg_attr( not(any(feature = "net", feature = "process", all(unix, feature = "signal"))), @@ -47,6 +51,11 @@ pub struct Handle { pub(super) blocking_spawner: blocking::Spawner, } +/// Create a new runtime handle. +pub(crate) trait ToHandle { + fn to_handle(&self) -> Handle; +} + /// Runtime context guard. /// /// Returned by [`Runtime::enter`] and [`Handle::enter`], the context guard exits @@ -196,85 +205,11 @@ impl Handle { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, _was_spawned) = - if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None) - } else { - self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None) - }; - - join_handle + self.as_inner().spawn_blocking(self, func) } - cfg_fs! { - #[track_caller] - #[cfg_attr(any( - all(loom, not(test)), // the function is covered by loom tests - test - ), allow(dead_code))] - pub(crate) fn spawn_mandatory_blocking(&self, func: F) -> Option> - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { - self.spawn_blocking_inner( - Box::new(func), - blocking::Mandatory::Mandatory, - None - ) - } else { - self.spawn_blocking_inner( - func, - blocking::Mandatory::Mandatory, - None - ) - }; - - if was_spawned { - Some(join_handle) - } else { - None - } - } - } - - #[track_caller] - pub(crate) fn spawn_blocking_inner( - &self, - func: F, - is_mandatory: blocking::Mandatory, - name: Option<&str>, - ) -> (JoinHandle, bool) - where - F: FnOnce() -> R + Send + 'static, - R: Send + 'static, - { - let fut = BlockingTask::new(func); - - #[cfg(all(tokio_unstable, feature = "tracing"))] - let fut = { - use tracing::Instrument; - let location = std::panic::Location::caller(); - let span = tracing::trace_span!( - target: "tokio::task::blocking", - "runtime.spawn", - kind = %"blocking", - task.name = %name.unwrap_or_default(), - "fn" = %std::any::type_name::(), - spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), - ); - fut.instrument(span) - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let _ = name; - - let (task, handle) = task::unowned(fut, NoopSchedule); - let spawned = self - .blocking_spawner - .spawn(blocking::Task::new(task, is_mandatory), self); - (handle, spawned.is_ok()) + pub(crate) fn as_inner(&self) -> &HandleInner { + self.spawner.as_handle_inner() } /// Runs a future to completion on this `Handle`'s associated `Runtime`. @@ -369,6 +304,12 @@ impl Handle { } } +impl ToHandle for Handle { + fn to_handle(&self) -> Handle { + self.clone() + } +} + cfg_metrics! { use crate::runtime::RuntimeMetrics; @@ -381,6 +322,99 @@ cfg_metrics! { } } +impl HandleInner { + #[track_caller] + pub(crate) fn spawn_blocking(&self, rt: &dyn ToHandle, func: F) -> JoinHandle + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (join_handle, _was_spawned) = if cfg!(debug_assertions) + && std::mem::size_of::() > 2048 + { + self.spawn_blocking_inner(Box::new(func), blocking::Mandatory::NonMandatory, None, rt) + } else { + self.spawn_blocking_inner(func, blocking::Mandatory::NonMandatory, None, rt) + }; + + join_handle + } + + cfg_fs! { + #[track_caller] + #[cfg_attr(any( + all(loom, not(test)), // the function is covered by loom tests + test + ), allow(dead_code))] + pub(crate) fn spawn_mandatory_blocking(&self, rt: &dyn ToHandle, func: F) -> Option> + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let (join_handle, was_spawned) = if cfg!(debug_assertions) && std::mem::size_of::() > 2048 { + self.spawn_blocking_inner( + Box::new(func), + blocking::Mandatory::Mandatory, + None, + rt, + ) + } else { + self.spawn_blocking_inner( + func, + blocking::Mandatory::Mandatory, + None, + rt, + ) + }; + + if was_spawned { + Some(join_handle) + } else { + None + } + } + } + + #[track_caller] + pub(crate) fn spawn_blocking_inner( + &self, + func: F, + is_mandatory: blocking::Mandatory, + name: Option<&str>, + rt: &dyn ToHandle, + ) -> (JoinHandle, bool) + where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, + { + let fut = BlockingTask::new(func); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let fut = { + use tracing::Instrument; + let location = std::panic::Location::caller(); + let span = tracing::trace_span!( + target: "tokio::task::blocking", + "runtime.spawn", + kind = %"blocking", + task.name = %name.unwrap_or_default(), + "fn" = %std::any::type_name::(), + spawn.location = %format_args!("{}:{}:{}", location.file(), location.line(), location.column()), + ); + fut.instrument(span) + }; + + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let _ = name; + + let (task, handle) = task::unowned(fut, NoopSchedule); + let spawned = self + .blocking_spawner + .spawn(blocking::Task::new(task, is_mandatory), rt); + (handle, spawned.is_ok()) + } +} + /// Error returned by `try_current` when no Runtime has been started #[derive(Debug)] pub struct TryCurrentError { diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index a2bec714c0c..1acf14ba181 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -526,6 +526,7 @@ cfg_net! { // TODO: Investigate if this should return 0, most of our metrics always increase // thus this breaks that guarantee. self.handle + .as_inner() .io_handle .as_ref() .and_then(|h| h.with_io_driver_metrics(f)) diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 66856df66c7..d7f54360236 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -218,24 +218,20 @@ cfg_rt! { pub use self::builder::Builder; pub(crate) mod context; - pub(crate) mod driver; + mod driver; use self::enter::enter; mod handle; pub use handle::{EnterGuard, Handle, TryCurrentError}; + pub(crate) use handle::{HandleInner, ToHandle}; mod spawner; use self::spawner::Spawner; } cfg_rt_multi_thread! { - mod park; - use park::Parker; -} - -cfg_rt_multi_thread! { - mod queue; + use driver::Driver; pub(crate) mod thread_pool; use self::thread_pool::ThreadPool; diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs index d81a806cb59..1dba8e3cef5 100644 --- a/tokio/src/runtime/spawner.rs +++ b/tokio/src/runtime/spawner.rs @@ -1,5 +1,5 @@ use crate::future::Future; -use crate::runtime::basic_scheduler; +use crate::runtime::{basic_scheduler, HandleInner}; use crate::task::JoinHandle; cfg_rt_multi_thread! { @@ -34,6 +34,14 @@ impl Spawner { Spawner::ThreadPool(spawner) => spawner.spawn(future), } } + + pub(crate) fn as_handle_inner(&self) -> &HandleInner { + match self { + Spawner::Basic(spawner) => spawner.as_handle_inner(), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.as_handle_inner(), + } + } } cfg_metrics! { diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index b5f78d7ebe7..d0ebf5d4350 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,6 +1,7 @@ use crate::runtime::blocking::NoopSchedule; use crate::runtime::task::Inject; -use crate::runtime::{queue, MetricsBatch}; +use crate::runtime::thread_pool::queue; +use crate::runtime::MetricsBatch; use loom::thread; diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 0fd1e0c6d9e..2bdaecf9f7c 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,5 +1,5 @@ -use crate::runtime::queue; use crate::runtime::task::{self, Inject, Schedule, Task}; +use crate::runtime::thread_pool::queue; use crate::runtime::MetricsBatch; use std::thread; diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index d3f46517cb0..76346c686e7 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -3,6 +3,11 @@ mod idle; use self::idle::Idle; +mod park; +pub(crate) use park::{Parker, Unparker}; + +pub(super) mod queue; + mod worker; pub(crate) use worker::Launch; @@ -10,7 +15,7 @@ pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; use crate::runtime::task::JoinHandle; -use crate::runtime::{Callback, Parker}; +use crate::runtime::{Callback, Driver, HandleInner}; use std::fmt; use std::future::Future; @@ -42,11 +47,14 @@ pub(crate) struct Spawner { impl ThreadPool { pub(crate) fn new( size: usize, - parker: Parker, + driver: Driver, + handle_inner: HandleInner, before_park: Option, after_unpark: Option, ) -> (ThreadPool, Launch) { - let (shared, launch) = worker::create(size, parker, before_park, after_unpark); + let parker = Parker::new(driver); + let (shared, launch) = + worker::create(size, parker, handle_inner, before_park, after_unpark); let spawner = Spawner { shared }; let thread_pool = ThreadPool { spawner }; @@ -101,6 +109,10 @@ impl Spawner { pub(crate) fn shutdown(&mut self) { self.shared.close(); } + + pub(crate) fn as_handle_inner(&self) -> &HandleInner { + self.shared.as_handle_inner() + } } cfg_metrics! { diff --git a/tokio/src/runtime/park.rs b/tokio/src/runtime/thread_pool/park.rs similarity index 100% rename from tokio/src/runtime/park.rs rename to tokio/src/runtime/thread_pool/park.rs diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/thread_pool/queue.rs similarity index 97% rename from tokio/src/runtime/queue.rs rename to tokio/src/runtime/thread_pool/queue.rs index ad9085a6545..1f5841d6dda 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/thread_pool/queue.rs @@ -11,14 +11,14 @@ use std::ptr; use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; /// Producer handle. May only be used from a single thread. -pub(super) struct Local { +pub(crate) struct Local { inner: Arc>, } /// Consumer handle. May be used from many threads. -pub(super) struct Steal(Arc>); +pub(crate) struct Steal(Arc>); -pub(super) struct Inner { +pub(crate) struct Inner { /// Concurrently updated by many threads. /// /// Contains two `u16` values. The LSB byte is the "real" head of the queue. @@ -65,7 +65,7 @@ fn make_fixed_size(buffer: Box<[T]>) -> Box<[T; LOCAL_QUEUE_CAPACITY]> { } /// Create a new local run-queue -pub(super) fn local() -> (Steal, Local) { +pub(crate) fn local() -> (Steal, Local) { let mut buffer = Vec::with_capacity(LOCAL_QUEUE_CAPACITY); for _ in 0..LOCAL_QUEUE_CAPACITY { @@ -89,7 +89,7 @@ pub(super) fn local() -> (Steal, Local) { impl Local { /// Returns true if the queue has entries that can be stealed. - pub(super) fn is_stealable(&self) -> bool { + pub(crate) fn is_stealable(&self) -> bool { !self.inner.is_empty() } @@ -97,12 +97,12 @@ impl Local { /// /// Separate to is_stealable so that refactors of is_stealable to "protect" /// some tasks from stealing won't affect this - pub(super) fn has_tasks(&self) -> bool { + pub(crate) fn has_tasks(&self) -> bool { !self.inner.is_empty() } /// Pushes a task to the back of the local queue, skipping the LIFO slot. - pub(super) fn push_back( + pub(crate) fn push_back( &mut self, mut task: task::Notified, inject: &Inject, @@ -259,7 +259,7 @@ impl Local { } /// Pops a task from the local queue. - pub(super) fn pop(&mut self) -> Option> { + pub(crate) fn pop(&mut self) -> Option> { let mut head = self.inner.head.load(Acquire); let idx = loop { @@ -301,12 +301,12 @@ impl Local { } impl Steal { - pub(super) fn is_empty(&self) -> bool { + pub(crate) fn is_empty(&self) -> bool { self.0.is_empty() } /// Steals half the tasks from self and place them into `dst`. - pub(super) fn steal_into( + pub(crate) fn steal_into( &self, dst: &mut Local, dst_metrics: &mut MetricsBatch, diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index a3893177c49..7668bb7cab4 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -63,10 +63,9 @@ use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::enter::EnterContext; -use crate::runtime::park::{Parker, Unparker}; use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; -use crate::runtime::thread_pool::Idle; -use crate::runtime::{queue, task, Callback, MetricsBatch, SchedulerMetrics, WorkerMetrics}; +use crate::runtime::thread_pool::{queue, Idle, Parker, Unparker}; +use crate::runtime::{task, Callback, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::util::atomic_cell::AtomicCell; use crate::util::FastRand; @@ -122,6 +121,9 @@ struct Core { /// State shared across all workers pub(super) struct Shared { + /// Handle to the I/O driver, timer, blocking spawner, ... + handle_inner: HandleInner, + /// Per-worker remote state. All other workers have access to this and is /// how they communicate between each other. remotes: Box<[Remote]>, @@ -193,6 +195,7 @@ scoped_thread_local!(static CURRENT: Context); pub(super) fn create( size: usize, park: Parker, + handle_inner: HandleInner, before_park: Option, after_unpark: Option, ) -> (Arc, Launch) { @@ -223,6 +226,7 @@ pub(super) fn create( } let shared = Arc::new(Shared { + handle_inner, remotes: remotes.into_boxed_slice(), inject: Inject::new(), idle: Idle::new(size), @@ -715,6 +719,10 @@ impl task::Schedule for Arc { } impl Shared { + pub(crate) fn as_handle_inner(&self) -> &HandleInner { + &self.handle_inner + } + pub(super) fn bind_new_task(me: &Arc, future: T) -> JoinHandle where T: Future + Send + 'static, @@ -853,6 +861,19 @@ impl Shared { } } +impl crate::runtime::ToHandle for Arc { + fn to_handle(&self) -> crate::runtime::Handle { + use crate::runtime::thread_pool::Spawner; + use crate::runtime::{self, Handle}; + + Handle { + spawner: runtime::Spawner::ThreadPool(Spawner { + shared: self.clone(), + }), + } + } +} + cfg_metrics! { impl Shared { pub(super) fn injection_queue_depth(&self) -> usize { diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index 2086302fb92..976ecc3c4b0 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -108,8 +108,13 @@ impl<'a> Builder<'a> { Output: Send + 'static, { use crate::runtime::Mandatory; - let (join_handle, _was_spawned) = - context::current().spawn_blocking_inner(function, Mandatory::NonMandatory, self.name); + let handle = context::current(); + let (join_handle, _was_spawned) = handle.as_inner().spawn_blocking_inner( + function, + Mandatory::NonMandatory, + self.name, + &handle, + ); join_handle } }