diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 403eabacfbc..d0a234efc59 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -39,11 +39,13 @@ impl MultiThread { seed_generator: RngSeedGenerator, config: Config, ) -> (MultiThread, Launch) { + let driver_unpark = driver.unpark(); let parker = Parker::new(driver); let (handle, launch) = worker::create( size, parker, driver_handle, + driver_unpark, blocking_spawner, seed_generator, config, diff --git a/tokio/src/runtime/scheduler/multi_thread/park.rs b/tokio/src/runtime/scheduler/multi_thread/park.rs index b3ef15c1f80..c50cc5d68d8 100644 --- a/tokio/src/runtime/scheduler/multi_thread/park.rs +++ b/tokio/src/runtime/scheduler/multi_thread/park.rs @@ -5,7 +5,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; -use crate::runtime::driver::{Driver, Unpark}; +use crate::runtime::driver::{self, Driver}; use crate::util::TryLock; use std::sync::atomic::Ordering::SeqCst; @@ -42,15 +42,10 @@ const NOTIFIED: usize = 3; struct Shared { /// Shared driver. Only one thread at a time can use this driver: TryLock, - - /// Unpark handle - handle: Unpark, } impl Parker { pub(crate) fn new(driver: Driver) -> Parker { - let handle = driver.unpark(); - Parker { inner: Arc::new(Inner { state: AtomicUsize::new(EMPTY), @@ -58,7 +53,6 @@ impl Parker { condvar: Condvar::new(), shared: Arc::new(Shared { driver: TryLock::new(driver), - handle, }), }), } @@ -102,8 +96,8 @@ impl Clone for Parker { } impl Unparker { - pub(crate) fn unpark(&self) { - self.inner.unpark(); + pub(crate) fn unpark(&self, driver: &driver::Unpark) { + self.inner.unpark(driver); } } @@ -201,7 +195,7 @@ impl Inner { } } - fn unpark(&self) { + fn unpark(&self, driver: &driver::Unpark) { // To ensure the unparked thread will observe any writes we made before // this call, we must perform a release operation that `park` can // synchronize with. To do that we must write `NOTIFIED` even if `state` @@ -211,7 +205,7 @@ impl Inner { EMPTY => {} // no one was waiting NOTIFIED => {} // already unparked PARKED_CONDVAR => self.unpark_condvar(), - PARKED_DRIVER => self.unpark_driver(), + PARKED_DRIVER => driver.unpark(), actual => panic!("inconsistent state in unpark; actual = {}", actual), } } @@ -233,10 +227,6 @@ impl Inner { self.condvar.notify_one() } - fn unpark_driver(&self) { - self.shared.handle.unpark(); - } - fn shutdown(&self) { if let Some(mut driver) = self.shared.driver.try_lock() { driver.shutdown(); diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 61e51fa5d01..2c799f2ff36 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -124,6 +124,9 @@ pub(super) struct Shared { /// how they communicate between each other. remotes: Box<[Remote]>, + /// Used to unpark threads blocked on the I/O driver + driver: driver::Unpark, + /// Global task queue used for: /// 1. Submit work to the scheduler while **not** currently on a worker thread. /// 2. Submit work to the scheduler when a worker run queue is saturated @@ -190,6 +193,7 @@ pub(super) fn create( size: usize, park: Parker, driver_handle: driver::Handle, + driver_unpark: driver::Unpark, blocking_spawner: blocking::Spawner, seed_generator: RngSeedGenerator, config: Config, @@ -223,6 +227,7 @@ pub(super) fn create( let handle = Arc::new(Handle { shared: Shared { remotes: remotes.into_boxed_slice(), + driver: driver_unpark, inject: Inject::new(), idle: Idle::new(size), owned: OwnedTasks::new(), @@ -774,13 +779,13 @@ impl Shared { fn notify_parked(&self) { if let Some(index) = self.idle.worker_to_notify() { - self.remotes[index].unpark.unpark(); + self.remotes[index].unpark.unpark(&self.driver); } } fn notify_all(&self) { for remote in &self.remotes[..] { - remote.unpark.unpark(); + remote.unpark.unpark(&self.driver); } }