From 36633f0eddfac6df3102381df1b1638921cb3648 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=8F=E4=B8=80?= Date: Sat, 7 May 2022 13:18:40 +0800 Subject: [PATCH 1/3] io: Refactor out usage of Weak in the io handle --- tokio/src/io/driver/mod.rs | 139 +++++++++++++++------------ tokio/src/io/driver/registration.rs | 19 +--- tokio/src/runtime/metrics/runtime.rs | 2 +- 3 files changed, 80 insertions(+), 80 deletions(-) diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 931f22364f1..02b9c375212 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -18,13 +18,13 @@ mod metrics; use crate::park::{Park, Unpark}; use crate::util::slab::{self, Slab}; -use crate::{loom::sync::Mutex, util::bit}; +use crate::{loom::sync::RwLock, util::bit}; use metrics::IoDriverMetrics; use std::fmt; use std::io; -use std::sync::{Arc, Weak}; +use std::sync::Arc; use std::time::Duration; /// I/O driver, backed by Mio. @@ -37,10 +37,8 @@ pub(crate) struct Driver { events: Option, /// Primary slab handle containing the state for each resource registered - /// with this driver. During Drop this is moved into the Inner structure, so - /// this is an Option to allow it to be vacated (until Drop this is always - /// Some). - resources: Option>, + /// with this driver. + resources: Slab, /// The system event queue. poll: mio::Poll, @@ -52,7 +50,7 @@ pub(crate) struct Driver { /// A reference to an I/O driver. #[derive(Clone)] pub(crate) struct Handle { - inner: Weak, + pub(super) inner: Arc, } #[derive(Debug)] @@ -61,20 +59,17 @@ pub(crate) struct ReadyEvent { pub(crate) ready: Ready, } -pub(super) struct Inner { - /// Primary slab handle containing the state for each resource registered - /// with this driver. - /// - /// The ownership of this slab is moved into this structure during - /// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles - /// without risking new ones being registered in the meantime. - resources: Mutex>>, +struct IoDispatcher { + allocator: slab::Allocator, + is_shutdown: bool, +} +pub(super) struct Inner { /// Registers I/O resources. registry: mio::Registry, /// Allocates `ScheduledIo` handles when creating new resources. - pub(super) io_dispatch: slab::Allocator, + io_dispatch: RwLock, /// Used to wake up the reactor from a call to `turn`. waker: mio::Waker, @@ -130,11 +125,10 @@ impl Driver { tick: 0, events: Some(mio::Events::with_capacity(1024)), poll, - resources: Some(slab), + resources: slab, inner: Arc::new(Inner { - resources: Mutex::new(None), registry, - io_dispatch: allocator, + io_dispatch: RwLock::new(IoDispatcher::new(allocator)), waker, metrics: IoDriverMetrics::default(), }), @@ -149,7 +143,7 @@ impl Driver { /// to bind them to this event loop. pub(crate) fn handle(&self) -> Handle { Handle { - inner: Arc::downgrade(&self.inner), + inner: Arc::clone(&self.inner), } } @@ -160,7 +154,7 @@ impl Driver { self.tick = self.tick.wrapping_add(1); if self.tick == COMPACT_INTERVAL { - self.resources.as_mut().unwrap().compact() + self.resources.compact() } let mut events = self.events.take().expect("i/o driver event store missing"); @@ -194,7 +188,7 @@ impl Driver { fn dispatch(&mut self, token: mio::Token, ready: Ready) { let addr = slab::Address::from_usize(ADDRESS.unpack(token.0)); - let resources = self.resources.as_mut().unwrap(); + let resources = &mut self.resources; let io = match resources.get(addr) { Some(io) => io, @@ -214,22 +208,7 @@ impl Driver { impl Drop for Driver { fn drop(&mut self) { - (*self.inner.resources.lock()) = self.resources.take(); - } -} - -impl Drop for Inner { - fn drop(&mut self) { - let resources = self.resources.lock().take(); - - if let Some(mut slab) = resources { - slab.for_each(|io| { - // If a task is waiting on the I/O resource, notify it. The task - // will then attempt to use the I/O resource and fail due to the - // driver being shutdown. - io.shutdown(); - }); - } + self.shutdown(); } } @@ -251,7 +230,16 @@ impl Park for Driver { Ok(()) } - fn shutdown(&mut self) {} + fn shutdown(&mut self) { + if self.inner.shutdown() { + self.resources.for_each(|io| { + // If a task is waiting on the I/O resource, notify it. The task + // will then attempt to use the I/O resource and fail due to the + // driver being shutdown. And shutdown will clear all wakers. + io.shutdown(); + }); + } + } } impl fmt::Debug for Driver { @@ -292,18 +280,11 @@ cfg_not_rt! { cfg_net! { cfg_metrics! { - impl Handle { - // TODO: Remove this when handle contains `Arc` so that we can return - // &IoDriverMetrics instead of using a closure. - // - // Related issue: https://github.com/tokio-rs/tokio/issues/4509 - pub(crate) fn with_io_driver_metrics(&self, f: F) -> Option - where - F: Fn(&IoDriverMetrics) -> R, - { - self.inner().map(|inner| f(&inner.metrics)) - } - } + impl Handle { + pub(crate) fn metrics(&self) -> &IoDriverMetrics { + &self.inner.metrics + } + } } } @@ -318,13 +299,7 @@ impl Handle { /// blocked in `turn`, then the next call to `turn` will not block and /// return immediately. fn wakeup(&self) { - if let Some(inner) = self.inner() { - inner.waker.wake().expect("failed to wake I/O driver"); - } - } - - pub(super) fn inner(&self) -> Option> { - self.inner.upgrade() + self.inner.waker.wake().expect("failed to wake I/O driver"); } } @@ -340,6 +315,17 @@ impl fmt::Debug for Handle { } } +// ===== impl IoDispatcher ===== + +impl IoDispatcher { + fn new(allocator: slab::Allocator) -> Self { + Self { + allocator, + is_shutdown: false, + } + } +} + // ===== impl Inner ===== impl Inner { @@ -351,12 +337,7 @@ impl Inner { source: &mut impl mio::event::Source, interest: Interest, ) -> io::Result> { - let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| { - io::Error::new( - io::ErrorKind::Other, - "reactor at max registered I/O resources", - ) - })?; + let (address, shared) = self.allocate()?; let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0)); @@ -376,6 +357,36 @@ impl Inner { Ok(()) } + + /// shutdown the dispatcher. + fn shutdown(&self) -> bool { + let mut io = self.io_dispatch.write().unwrap(); + if io.is_shutdown { + return false; + } + io.is_shutdown = true; + true + } + + fn is_shutdown(&self) -> bool { + return self.io_dispatch.read().unwrap().is_shutdown; + } + + fn allocate(&self) -> io::Result<(slab::Address, slab::Ref)> { + let io = self.io_dispatch.read().unwrap(); + if io.is_shutdown { + return Err(io::Error::new( + io::ErrorKind::Other, + "failed to find event loop", + )); + } + io.allocator.allocate().ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + "reactor at max registered I/O resources", + ) + }) + } } impl Direction { diff --git a/tokio/src/io/driver/registration.rs b/tokio/src/io/driver/registration.rs index 7350be6345d..c9393650c20 100644 --- a/tokio/src/io/driver/registration.rs +++ b/tokio/src/io/driver/registration.rs @@ -72,14 +72,7 @@ impl Registration { interest: Interest, handle: Handle, ) -> io::Result { - let shared = if let Some(inner) = handle.inner() { - inner.add_source(io, interest)? - } else { - return Err(io::Error::new( - io::ErrorKind::Other, - "failed to find event loop", - )); - }; + let shared = handle.inner.add_source(io, interest)?; Ok(Registration { handle, shared }) } @@ -101,11 +94,7 @@ impl Registration { /// /// `Err` is returned if an error is encountered. pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { - let inner = match self.handle.inner() { - Some(inner) => inner, - None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), - }; - inner.deregister_source(io) + self.handle.inner.deregister_source(io) } pub(crate) fn clear_readiness(&self, event: ReadyEvent) { @@ -157,7 +146,7 @@ impl Registration { let coop = ready!(crate::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); - if self.handle.inner().is_none() { + if self.handle.inner.is_shutdown() { return Poll::Ready(Err(gone())); } @@ -235,7 +224,7 @@ cfg_io_readiness! { pin!(fut); crate::future::poll_fn(|cx| { - if self.handle.inner().is_none() { + if self.handle.inner.is_shutdown() { return Poll::Ready(Err(io::Error::new( io::ErrorKind::Other, crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 26a0118a475..59a87525517 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -529,7 +529,7 @@ cfg_net! { .as_inner() .io_handle .as_ref() - .and_then(|h| h.with_io_driver_metrics(f)) + .map(|h| f(h.metrics())) .unwrap_or(0) } } From c535837ae3123c225ebc5adca3959ea55fc341d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=8F=E4=B8=80?= Date: Mon, 16 May 2022 23:36:40 +0800 Subject: [PATCH 2/3] io: Format the code manually --- tokio/src/io/driver/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 02b9c375212..88428bfe3af 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -280,11 +280,11 @@ cfg_not_rt! { cfg_net! { cfg_metrics! { - impl Handle { - pub(crate) fn metrics(&self) -> &IoDriverMetrics { - &self.inner.metrics - } - } + impl Handle { + pub(crate) fn metrics(&self) -> &IoDriverMetrics { + &self.inner.metrics + } + } } } From 43fce8970e483138ebdca57bd9af32e115d007bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=8F=E4=B8=80?= Date: Tue, 17 May 2022 04:41:51 +0800 Subject: [PATCH 3/3] io: Format the code manually --- tokio/src/io/driver/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/driver/mod.rs b/tokio/src/io/driver/mod.rs index 88428bfe3af..66bc3182206 100644 --- a/tokio/src/io/driver/mod.rs +++ b/tokio/src/io/driver/mod.rs @@ -280,11 +280,11 @@ cfg_not_rt! { cfg_net! { cfg_metrics! { - impl Handle { + impl Handle { pub(crate) fn metrics(&self) -> &IoDriverMetrics { &self.inner.metrics } - } + } } }