From c30924a274e6d4f21cb7b651ac4b5db40fa0b0e6 Mon Sep 17 00:00:00 2001 From: HyeonuPark Date: Fri, 16 Dec 2022 01:12:22 +0900 Subject: [PATCH 1/2] remove frequent global is_shutdown flag check on io operations The global flag is still remaining, and used to prevent duplicated shutdown and new io operations after shutdown. On shutdown, the driver flip the shutdown flag of every pending io operations and wake them to fail with shutdown error. Fixes: #5227 --- tokio/docs/reactor-refactor.md | 4 +-- tokio/src/runtime/io/mod.rs | 12 +++----- tokio/src/runtime/io/registration.rs | 24 +++++++++------- tokio/src/runtime/io/scheduled_io.rs | 43 ++++++++++++++++------------ 4 files changed, 43 insertions(+), 40 deletions(-) diff --git a/tokio/docs/reactor-refactor.md b/tokio/docs/reactor-refactor.md index 3005afc0168..77e64f4dfd7 100644 --- a/tokio/docs/reactor-refactor.md +++ b/tokio/docs/reactor-refactor.md @@ -188,12 +188,12 @@ readiness, the driver's tick is packed into the atomic `usize`. The `ScheduledIo` readiness `AtomicUsize` is structured as: ``` -| reserved | generation | driver tick | readiness | +| shutdown | generation | driver tick | readiness | |----------+------------+--------------+-----------| | 1 bit | 7 bits + 8 bits + 16 bits | ``` -The `reserved` and `generation` components exist today. +The `shutdown` and `generation` components exist today. The `readiness()` function returns a `ReadyEvent` value. This value includes the `tick` component read with the resource's readiness value. When diff --git a/tokio/src/runtime/io/mod.rs b/tokio/src/runtime/io/mod.rs index 1ddf920f3bb..2e578b6ee6b 100644 --- a/tokio/src/runtime/io/mod.rs +++ b/tokio/src/runtime/io/mod.rs @@ -60,6 +60,7 @@ pub(crate) struct Handle { pub(crate) struct ReadyEvent { tick: u8, pub(crate) ready: Ready, + is_shutdown: bool, } struct IoDispatcher { @@ -147,9 +148,8 @@ impl Driver { if handle.shutdown() { self.resources.for_each(|io| { - // If a task is waiting on the I/O resource, notify it. The task - // will then attempt to use the I/O resource and fail due to the - // driver being shutdown. And shutdown will clear all wakers. + // If a task is waiting on the I/O resource, notify it that the + // runtime is being shutdown. And shutdown will clear all wakers. io.shutdown(); }); } @@ -282,16 +282,12 @@ impl Handle { true } - fn is_shutdown(&self) -> bool { - return self.io_dispatch.read().unwrap().is_shutdown; - } - fn allocate(&self) -> io::Result<(slab::Address, slab::Ref)> { let io = self.io_dispatch.read().unwrap(); if io.is_shutdown { return Err(io::Error::new( io::ErrorKind::Other, - "failed to find event loop", + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, )); } io.allocator.allocate().ok_or_else(|| { diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 7b95f7f0409..40632c73f0c 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -148,7 +148,7 @@ impl Registration { let coop = ready!(crate::runtime::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); - if self.handle().is_shutdown() { + if ev.is_shutdown { return Poll::Ready(Err(gone())); } @@ -217,7 +217,10 @@ impl Drop for Registration { } fn gone() -> io::Error { - io::Error::new(io::ErrorKind::Other, "IO driver has terminated") + io::Error::new( + io::ErrorKind::Other, + crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR, + ) } cfg_io_readiness! { @@ -229,16 +232,15 @@ cfg_io_readiness! { let fut = self.shared.readiness(interest); pin!(fut); - crate::future::poll_fn(|cx| { - if self.handle().is_shutdown() { - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::Other, - crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR - ))); - } + let ev = crate::future::poll_fn(|cx| { + Pin::new(&mut fut).poll(cx) + }).await; + + if ev.is_shutdown { + return Err(gone()) + } - Pin::new(&mut fut).poll(cx).map(Ok) - }).await + Ok(ev) } pub(crate) async fn async_io(&self, interest: Interest, mut f: impl FnMut() -> io::Result) -> io::Result { diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index 1709091032b..197a4e0e211 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -46,9 +46,6 @@ struct Waiters { /// Waker used for AsyncWrite. writer: Option, - - /// True if this ScheduledIo has been killed due to IO driver shutdown. - is_shutdown: bool, } cfg_io_readiness! { @@ -95,7 +92,7 @@ cfg_io_readiness! { // The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness. // -// | reserved | generation | driver tick | readiness | +// | shutdown | generation | driver tick | readiness | // |----------+------------+--------------+-----------| // | 1 bit | 7 bits + 8 bits + 16 bits | @@ -105,6 +102,8 @@ const TICK: bit::Pack = READINESS.then(8); const GENERATION: bit::Pack = TICK.then(7); +const SHUTDOWN: bit::Pack = GENERATION.then(1); + #[test] fn test_generations_assert_same() { assert_eq!(super::GENERATION, GENERATION); @@ -138,9 +137,11 @@ impl ScheduledIo { } /// Invoked when the IO driver is shut down; forces this ScheduledIo into a - /// permanently ready state. + /// permanently shutdown state. pub(super) fn shutdown(&self) { - self.wake0(Ready::ALL, true) + let mask = SHUTDOWN.pack(1, 0); + self.readiness.fetch_or(mask, AcqRel); + self.wake(Ready::ALL); } /// Sets the readiness on this `ScheduledIo` by invoking the given closure on @@ -219,16 +220,10 @@ impl ScheduledIo { /// than 32 wakers to notify, if the stack array fills up, the lock is /// released, the array is cleared, and the iteration continues. pub(super) fn wake(&self, ready: Ready) { - self.wake0(ready, false); - } - - fn wake0(&self, ready: Ready, shutdown: bool) { let mut wakers = WakeList::new(); let mut waiters = self.waiters.lock(); - waiters.is_shutdown |= shutdown; - // check for AsyncRead slot if ready.is_readable() { if let Some(waker) = waiters.reader.take() { @@ -283,6 +278,7 @@ impl ScheduledIo { ReadyEvent { tick: TICK.unpack(curr) as u8, ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)), + is_shutdown: SHUTDOWN.unpack(curr) != 0, } } @@ -299,8 +295,9 @@ impl ScheduledIo { let curr = self.readiness.load(Acquire); let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; - if ready.is_empty() { + if ready.is_empty() && !is_shutdown { // Update the task info let mut waiters = self.waiters.lock(); let slot = match direction { @@ -325,10 +322,12 @@ impl ScheduledIo { // taking the waiters lock let curr = self.readiness.load(Acquire); let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr)); - if waiters.is_shutdown { + let is_shutdown = SHUTDOWN.unpack(curr) != 0; + if is_shutdown { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, ready: direction.mask(), + is_shutdown, }) } else if ready.is_empty() { Poll::Pending @@ -336,12 +335,14 @@ impl ScheduledIo { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, ready, + is_shutdown, }) } } else { Poll::Ready(ReadyEvent { tick: TICK.unpack(curr) as u8, ready, + is_shutdown, }) } } @@ -433,16 +434,17 @@ cfg_io_readiness! { // Optimistically check existing readiness let curr = scheduled_io.readiness.load(SeqCst); let ready = Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; // Safety: `waiter.interest` never changes let interest = unsafe { (*waiter.get()).interest }; let ready = ready.intersection(interest); - if !ready.is_empty() { + if !ready.is_empty() || is_shutdown { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); + return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); } // Wasn't ready, take the lock (and check again while locked). @@ -450,18 +452,19 @@ cfg_io_readiness! { let curr = scheduled_io.readiness.load(SeqCst); let mut ready = Ready::from_usize(READINESS.unpack(curr)); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; - if waiters.is_shutdown { + if is_shutdown { ready = Ready::ALL; } let ready = ready.intersection(interest); - if !ready.is_empty() { + if !ready.is_empty() || is_shutdown { // Currently ready! let tick = TICK.unpack(curr) as u8; *state = State::Done; - return Poll::Ready(ReadyEvent { tick, ready }); + return Poll::Ready(ReadyEvent { tick, ready, is_shutdown }); } // Not ready even after locked, insert into list... @@ -514,6 +517,7 @@ cfg_io_readiness! { let w = unsafe { &mut *waiter.get() }; let curr = scheduled_io.readiness.load(Acquire); + let is_shutdown = SHUTDOWN.unpack(curr) != 0; // The returned tick might be newer than the event // which notified our waker. This is ok because the future @@ -528,6 +532,7 @@ cfg_io_readiness! { return Poll::Ready(ReadyEvent { tick, ready, + is_shutdown, }); } } From 1e7566093942c3a7cc75d1dfb20d1a3ae4d40e56 Mon Sep 17 00:00:00 2001 From: HyeonuPark Date: Sat, 17 Dec 2022 22:00:16 +0900 Subject: [PATCH 2/2] apply feedback --- tokio/src/runtime/io/registration.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 40632c73f0c..140b9240ae4 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -226,15 +226,7 @@ fn gone() -> io::Error { cfg_io_readiness! { impl Registration { pub(crate) async fn readiness(&self, interest: Interest) -> io::Result { - use std::future::Future; - use std::pin::Pin; - - let fut = self.shared.readiness(interest); - pin!(fut); - - let ev = crate::future::poll_fn(|cx| { - Pin::new(&mut fut).poll(cx) - }).await; + let ev = self.shared.readiness(interest).await; if ev.is_shutdown { return Err(gone())