diff --git a/tokio/src/process/unix/driver.rs b/tokio/src/process/unix/driver.rs index 5f9fdddfd0b..bb6ae369538 100644 --- a/tokio/src/process/unix/driver.rs +++ b/tokio/src/process/unix/driver.rs @@ -3,7 +3,6 @@ //! Process driver. use crate::process::unix::GlobalOrphanQueue; -use crate::runtime::io::Handle; use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; use std::time::Duration; @@ -28,10 +27,6 @@ impl Driver { } } - pub(crate) fn unpark(&self) -> Handle { - self.park.unpark() - } - pub(crate) fn park(&mut self) { self.park.park(); GlobalOrphanQueue::reap_orphans(&self.signal_handle); diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 5e739aebe6b..74d9009240a 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -38,8 +38,6 @@ pub(crate) struct Cfg { pub(crate) start_paused: bool, } -pub(crate) type Unpark = TimerUnpark; - impl Driver { pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> { let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; @@ -60,10 +58,6 @@ impl Driver { )) } - pub(crate) fn unpark(&self) -> TimerUnpark { - self.inner.unpark() - } - pub(crate) fn park(&mut self) { self.inner.park() } @@ -77,11 +71,21 @@ impl Driver { } } +impl Handle { + pub(crate) fn unpark(&self) { + #[cfg(feature = "time")] + if let Some(handle) = &self.time { + handle.unpark(); + } + + self.io.unpark(); + } +} + // ===== io driver ===== cfg_io_driver! { pub(crate) type IoDriver = crate::runtime::io::Driver; - pub(crate) type IoHandle = IoUnpark; #[derive(Debug)] pub(crate) enum IoStack { @@ -90,7 +94,7 @@ cfg_io_driver! { } #[derive(Debug, Clone)] - pub(crate) enum IoUnpark { + pub(crate) enum IoHandle { Enabled(crate::runtime::io::Handle), Disabled(UnparkThread), } @@ -106,23 +110,25 @@ cfg_io_driver! { let (signal_driver, signal_handle) = create_signal_driver(io_driver)?; let process_driver = create_process_driver(signal_driver); - (IoStack::Enabled(process_driver), IoUnpark::Enabled(io_handle), signal_handle) + (IoStack::Enabled(process_driver), IoHandle::Enabled(io_handle), signal_handle) } else { let park_thread = ParkThread::new(); let unpark_thread = park_thread.unpark(); - (IoStack::Disabled(park_thread), IoUnpark::Disabled(unpark_thread), Default::default()) + (IoStack::Disabled(park_thread), IoHandle::Disabled(unpark_thread), Default::default()) }; Ok(ret) } impl IoStack { - pub(crate) fn unpark(&self) -> IoUnpark { + /* + pub(crate) fn handle(&self) -> IoHandle { match self { - IoStack::Enabled(v) => IoUnpark::Enabled(v.unpark()), - IoStack::Disabled(v) => IoUnpark::Disabled(v.unpark()), + IoStack::Enabled(v) => IoHandle::Enabled(v.handle()), + IoStack::Disabled(v) => IoHandle::Disabled(v.unpark()), } - } + }] + */ pub(crate) fn park(&mut self) { match self { @@ -146,27 +152,27 @@ cfg_io_driver! { } } - impl IoUnpark { + impl IoHandle { pub(crate) fn unpark(&self) { match self { - IoUnpark::Enabled(v) => v.unpark(), - IoUnpark::Disabled(v) => v.unpark(), + IoHandle::Enabled(handle) => handle.unpark(), + IoHandle::Disabled(handle) => handle.unpark(), } } #[track_caller] pub(crate) fn expect(self, msg: &'static str) -> crate::runtime::io::Handle { match self { - IoUnpark::Enabled(v) => v, - IoUnpark::Disabled(..) => panic!("{}", msg), + IoHandle::Enabled(v) => v, + IoHandle::Disabled(..) => panic!("{}", msg), } } cfg_unstable! { pub(crate) fn as_ref(&self) -> Option<&crate::runtime::io::Handle> { match self { - IoUnpark::Enabled(v) => Some(v), - IoUnpark::Disabled(..) => None, + IoHandle::Enabled(v) => Some(v), + IoHandle::Disabled(..) => None, } } } @@ -174,9 +180,8 @@ cfg_io_driver! { } cfg_not_io_driver! { - pub(crate) type IoHandle = IoUnpark; + pub(crate) type IoHandle = UnparkThread; pub(crate) type IoStack = ParkThread; - pub(crate) type IoUnpark = UnparkThread; fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { let park_thread = ParkThread::new(); @@ -249,11 +254,6 @@ cfg_time! { Disabled(IoStack), } - pub(crate) enum TimerUnpark { - Enabled(crate::runtime::time::TimerUnpark), - Disabled(IoUnpark), - } - pub(crate) type Clock = crate::time::Clock; pub(crate) type TimeHandle = Option; @@ -276,13 +276,6 @@ cfg_time! { } impl TimeDriver { - pub(crate) fn unpark(&self) -> TimerUnpark { - match self { - TimeDriver::Enabled { driver, .. } => TimerUnpark::Enabled(driver.unpark()), - TimeDriver::Disabled(v) => TimerUnpark::Disabled(v.unpark()), - } - } - pub(crate) fn park(&mut self) { match self { TimeDriver::Enabled { driver, handle } => driver.park(handle), @@ -304,20 +297,10 @@ cfg_time! { } } } - - impl TimerUnpark { - pub(crate) fn unpark(&self) { - match self { - TimerUnpark::Enabled(v) => v.unpark(), - TimerUnpark::Disabled(v) => v.unpark(), - } - } - } } cfg_not_time! { type TimeDriver = IoStack; - type TimerUnpark = IoUnpark; pub(crate) type Clock = (); pub(crate) type TimeHandle = (); diff --git a/tokio/src/runtime/io/mod.rs b/tokio/src/runtime/io/mod.rs index 8a83674770e..aa9447322ef 100644 --- a/tokio/src/runtime/io/mod.rs +++ b/tokio/src/runtime/io/mod.rs @@ -144,11 +144,6 @@ impl Driver { } } - // TODO: remove this in a later refactor - pub(crate) fn unpark(&self) -> Handle { - self.handle() - } - pub(crate) fn park(&mut self) { self.turn(None); } diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 320fcb54ad5..666be6b13f2 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -2,7 +2,7 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::{Arc, Mutex}; use crate::runtime::context::EnterGuard; -use crate::runtime::driver::{self, Driver, Unpark}; +use crate::runtime::driver::{self, Driver}; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::runtime::{blocking, Config}; use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; @@ -82,9 +82,6 @@ struct Shared { /// Collection of all active tasks spawned onto this executor. owned: OwnedTasks>, - /// Unpark the blocked thread. - unpark: Unpark, - /// Indicates whether the blocked on thread was woken. woken: AtomicBool, @@ -122,13 +119,10 @@ impl CurrentThread { seed_generator: RngSeedGenerator, config: Config, ) -> CurrentThread { - let unpark = driver.unpark(); - let handle = Arc::new(Handle { shared: Shared { queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), owned: OwnedTasks::new(), - unpark, woken: AtomicBool::new(false), config, scheduler_metrics: SchedulerMetrics::new(), @@ -467,7 +461,7 @@ impl Schedule for Arc { if let Some(queue) = guard.as_mut() { queue.push_back(task); drop(guard); - self.shared.unpark.unpark(); + self.driver.unpark(); } } }); @@ -511,7 +505,7 @@ impl Wake for Handle { /// Wake by reference fn wake_by_ref(arc_self: &Arc) { arc_self.shared.woken.store(true, Release); - arc_self.shared.unpark.unpark(); + arc_self.driver.unpark(); } } diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index a34042257be..884f400bf00 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -35,7 +35,7 @@ impl Handle { } pub(crate) fn shutdown(&self) { - self.shared.close(); + self.close(); } pub(super) fn bind_new_task(me: &Arc, future: T, id: task::Id) -> JoinHandle @@ -46,7 +46,7 @@ impl Handle { let (handle, notified) = me.shared.owned.bind(future, me.clone(), id); if let Some(notified) = notified { - me.shared.schedule(notified, false); + me.schedule_task(notified, false); } handle diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index d0a234efc59..403eabacfbc 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -39,13 +39,11 @@ impl MultiThread { seed_generator: RngSeedGenerator, config: Config, ) -> (MultiThread, Launch) { - let driver_unpark = driver.unpark(); let parker = Parker::new(driver); let (handle, launch) = worker::create( size, parker, driver_handle, - driver_unpark, blocking_spawner, seed_generator, config, diff --git a/tokio/src/runtime/scheduler/multi_thread/park.rs b/tokio/src/runtime/scheduler/multi_thread/park.rs index c50cc5d68d8..46432f4f036 100644 --- a/tokio/src/runtime/scheduler/multi_thread/park.rs +++ b/tokio/src/runtime/scheduler/multi_thread/park.rs @@ -96,7 +96,7 @@ impl Clone for Parker { } impl Unparker { - pub(crate) fn unpark(&self, driver: &driver::Unpark) { + pub(crate) fn unpark(&self, driver: &driver::Handle) { self.inner.unpark(driver); } } @@ -195,7 +195,7 @@ impl Inner { } } - fn unpark(&self, driver: &driver::Unpark) { + fn unpark(&self, driver: &driver::Handle) { // To ensure the unparked thread will observe any writes we made before // this call, we must perform a release operation that `park` can // synchronize with. To do that we must write `NOTIFIED` even if `state` diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 2c799f2ff36..7da4a3f0f56 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -124,9 +124,6 @@ pub(super) struct Shared { /// how they communicate between each other. remotes: Box<[Remote]>, - /// Used to unpark threads blocked on the I/O driver - driver: driver::Unpark, - /// Global task queue used for: /// 1. Submit work to the scheduler while **not** currently on a worker thread. /// 2. Submit work to the scheduler when a worker run queue is saturated @@ -193,7 +190,6 @@ pub(super) fn create( size: usize, park: Parker, driver_handle: driver::Handle, - driver_unpark: driver::Unpark, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, @@ -227,7 +223,6 @@ pub(super) fn create( let handle = Arc::new(Handle { shared: Shared { remotes: remotes.into_boxed_slice(), - driver: driver_unpark, inject: Inject::new(), idle: Idle::new(size), owned: OwnedTasks::new(), @@ -408,7 +403,7 @@ impl Context { core.pre_shutdown(&self.worker); // Signal shutdown - self.worker.handle.shared.shutdown(core); + self.worker.handle.shutdown_core(core); Err(()) } @@ -533,7 +528,7 @@ impl Context { // If there are tasks available to steal, but this worker is not // looking for tasks to steal, notify another worker. if !core.is_searching && core.run_queue.is_stealable() { - self.worker.handle.shared.notify_parked(); + self.worker.handle.notify_parked(); } core @@ -608,7 +603,7 @@ impl Core { } self.is_searching = false; - worker.handle.shared.transition_worker_from_searching(); + worker.handle.transition_worker_from_searching(); } /// Prepares the worker state for parking. @@ -634,7 +629,7 @@ impl Core { self.is_searching = false; if is_last_searcher { - worker.handle.shared.notify_if_work_pending(); + worker.handle.notify_if_work_pending(); } true @@ -702,26 +697,27 @@ impl Worker { } } +// TODO: Move `Handle` impls into handle.rs impl task::Schedule for Arc { fn release(&self, task: &Task) -> Option { self.shared.owned.remove(task) } fn schedule(&self, task: Notified) { - self.shared.schedule(task, false); + self.schedule_task(task, false); } fn yield_now(&self, task: Notified) { - self.shared.schedule(task, true); + self.schedule_task(task, true); } } -impl Shared { - pub(super) fn schedule(&self, task: Notified, is_yield: bool) { +impl Handle { + pub(super) fn schedule_task(&self, task: Notified, is_yield: bool) { CURRENT.with(|maybe_cx| { if let Some(cx) = maybe_cx { // Make sure the task is part of the **current** scheduler. - if self.ptr_eq(&cx.worker.handle.shared) { + if self.ptr_eq(&cx.worker.handle) { // And the current thread still holds a core if let Some(core) = cx.core.borrow_mut().as_mut() { self.schedule_local(core, task, is_yield); @@ -731,8 +727,8 @@ impl Shared { } // Otherwise, use the inject queue. - self.inject.push(task); - self.scheduler_metrics.inc_remote_schedule_count(); + self.shared.inject.push(task); + self.shared.scheduler_metrics.inc_remote_schedule_count(); self.notify_parked(); }) } @@ -744,9 +740,9 @@ impl Shared { // task must always be pushed to the back of the queue, enabling other // tasks to be executed. If **not** a yield, then there is more // flexibility and the task may go to the front of the queue. - let should_notify = if is_yield || self.config.disable_lifo_slot { + let should_notify = if is_yield || self.shared.config.disable_lifo_slot { core.run_queue - .push_back(task, &self.inject, &mut core.metrics); + .push_back(task, &self.shared.inject, &mut core.metrics); true } else { // Push to the LIFO slot @@ -755,7 +751,7 @@ impl Shared { if let Some(prev) = prev { core.run_queue - .push_back(prev, &self.inject, &mut core.metrics); + .push_back(prev, &self.shared.inject, &mut core.metrics); } core.lifo_slot = Some(task); @@ -772,38 +768,38 @@ impl Shared { } pub(super) fn close(&self) { - if self.inject.close() { + if self.shared.inject.close() { self.notify_all(); } } fn notify_parked(&self) { - if let Some(index) = self.idle.worker_to_notify() { - self.remotes[index].unpark.unpark(&self.driver); + if let Some(index) = self.shared.idle.worker_to_notify() { + self.shared.remotes[index].unpark.unpark(&self.driver); } } fn notify_all(&self) { - for remote in &self.remotes[..] { + for remote in &self.shared.remotes[..] { remote.unpark.unpark(&self.driver); } } fn notify_if_work_pending(&self) { - for remote in &self.remotes[..] { + for remote in &self.shared.remotes[..] { if !remote.steal.is_empty() { self.notify_parked(); return; } } - if !self.inject.is_empty() { + if !self.shared.inject.is_empty() { self.notify_parked(); } } fn transition_worker_from_searching(&self) { - if self.idle.transition_worker_from_searching() { + if self.shared.idle.transition_worker_from_searching() { // We are the final searching worker. Because work was found, we // need to notify another worker. self.notify_parked(); @@ -814,15 +810,15 @@ impl Shared { /// its core back into its handle. /// /// If all workers have reached this point, the final cleanup is performed. - fn shutdown(&self, core: Box) { - let mut cores = self.shutdown_cores.lock(); + fn shutdown_core(&self, core: Box) { + let mut cores = self.shared.shutdown_cores.lock(); cores.push(core); - if cores.len() != self.remotes.len() { + if cores.len() != self.shared.remotes.len() { return; } - debug_assert!(self.owned.is_empty()); + debug_assert!(self.shared.owned.is_empty()); for mut core in cores.drain(..) { core.shutdown(); @@ -831,12 +827,12 @@ impl Shared { // Drain the injection queue // // We already shut down every task, so we can simply drop the tasks. - while let Some(task) = self.inject.pop() { + while let Some(task) = self.shared.inject.pop() { drop(task); } } - fn ptr_eq(&self, other: &Shared) -> bool { + fn ptr_eq(&self, other: &Handle) -> bool { std::ptr::eq(self, other) } } diff --git a/tokio/src/runtime/time/handle.rs b/tokio/src/runtime/time/handle.rs index 6cbf550ef5d..8338f2b5b12 100644 --- a/tokio/src/runtime/time/handle.rs +++ b/tokio/src/runtime/time/handle.rs @@ -30,6 +30,14 @@ impl Handle { pub(super) fn is_shutdown(&self) -> bool { self.inner.is_shutdown() } + + /// Track that the driver is being unparked + pub(crate) fn unpark(&self) { + #[cfg(feature = "test-util")] + self.inner + .did_wake + .store(true, std::sync::atomic::Ordering::SeqCst); + } } cfg_not_rt! { diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 79885884164..ec4333bf8df 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -20,7 +20,7 @@ mod wheel; use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::{Arc, Mutex}; -use crate::runtime::driver::{IoStack, IoUnpark}; +use crate::runtime::driver::{IoHandle, IoStack}; use crate::time::error::Error; use crate::time::{Clock, Duration}; @@ -107,6 +107,9 @@ struct Inner { /// True if the driver is being shutdown. pub(super) is_shutdown: AtomicBool, + + #[cfg(feature = "test-util")] + did_wake: Arc, } /// Time state shared which must be protected by a `Mutex` @@ -134,23 +137,34 @@ impl Driver { pub(crate) fn new(park: IoStack, clock: Clock) -> (Driver, Handle) { let time_source = TimeSource::new(clock); - let inner = Inner::new(time_source.clone()); - let handle = Handle::new(Arc::new(inner)); + #[cfg(feature = "test-util")] + let did_wake = Arc::new(AtomicBool::new(false)); + + let inner = Arc::new(Inner { + state: Mutex::new(InnerState { + time_source: time_source.clone(), + elapsed: 0, + next_wake: None, + wheel: wheel::Wheel::new(), + }), + is_shutdown: AtomicBool::new(false), + + #[cfg(feature = "test-util")] + did_wake: did_wake.clone(), + }); + + let handle = Handle::new(inner); let driver = Driver { time_source, park, #[cfg(feature = "test-util")] - did_wake: Arc::new(AtomicBool::new(false)), + did_wake, }; (driver, handle) } - pub(crate) fn unpark(&self) -> TimerUnpark { - TimerUnpark::new(self) - } - pub(crate) fn park(&mut self, handle: &Handle) { self.park_internal(handle, None) } @@ -339,7 +353,7 @@ impl Handle { /// the `TimerEntry`) pub(self) unsafe fn reregister( &self, - unpark: &IoUnpark, + unpark: &IoHandle, new_tick: u64, entry: NonNull, ) { @@ -393,46 +407,9 @@ impl Handle { } } -pub(crate) struct TimerUnpark { - inner: IoUnpark, - - #[cfg(feature = "test-util")] - did_wake: Arc, -} - -impl TimerUnpark { - fn new(driver: &Driver) -> TimerUnpark { - TimerUnpark { - inner: driver.park.unpark(), - - #[cfg(feature = "test-util")] - did_wake: driver.did_wake.clone(), - } - } - - pub(crate) fn unpark(&self) { - #[cfg(feature = "test-util")] - self.did_wake.store(true, Ordering::SeqCst); - - self.inner.unpark(); - } -} - // ===== impl Inner ===== impl Inner { - pub(self) fn new(time_source: TimeSource) -> Self { - Inner { - state: Mutex::new(InnerState { - time_source, - elapsed: 0, - next_wake: None, - wheel: wheel::Wheel::new(), - }), - is_shutdown: AtomicBool::new(false), - } - } - /// Locks the driver's inner structure pub(super) fn lock(&self) -> crate::loom::sync::MutexGuard<'_, InnerState> { self.state.lock() diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs index 6192c807303..327d6425b02 100644 --- a/tokio/src/signal/unix/driver.rs +++ b/tokio/src/signal/unix/driver.rs @@ -92,10 +92,6 @@ impl Driver { } } - pub(crate) fn unpark(&self) -> io::Handle { - self.park.unpark() - } - pub(crate) fn park(&mut self) { self.park.park(); self.process();