From 32d68fe579f818d349103fa0e9fccf3d80a9f717 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 27 Oct 2022 11:13:25 -0700 Subject: [PATCH] rt: remove Arc from I/O driver (#5134) The next step in the great driver cleanup. This patch removes the Arc used in the I/O driver in favor of `runtime::scheduler::Handle`. --- tokio/src/runtime/driver.rs | 2 +- tokio/src/runtime/io/mod.rs | 98 +++++++++++----------------- tokio/src/runtime/io/registration.rs | 8 +-- tokio/src/runtime/metrics/runtime.rs | 2 +- 4 files changed, 44 insertions(+), 66 deletions(-) diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 7da12e54d99..184e736a90a 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -114,7 +114,7 @@ cfg_io_driver! { Disabled(ParkThread), } - #[derive(Debug, Clone)] + #[derive(Debug)] pub(crate) enum IoHandle { Enabled(crate::runtime::io::Handle), Disabled(UnparkThread), diff --git a/tokio/src/runtime/io/mod.rs b/tokio/src/runtime/io/mod.rs index 26dd6a2164f..00348edb30d 100644 --- a/tokio/src/runtime/io/mod.rs +++ b/tokio/src/runtime/io/mod.rs @@ -18,7 +18,6 @@ use metrics::IoDriverMetrics; use std::fmt; use std::io; -use std::sync::Arc; use std::time::Duration; /// I/O driver, backed by Mio. @@ -42,23 +41,7 @@ pub(crate) struct Driver { } /// A reference to an I/O driver. -#[derive(Clone)] pub(crate) struct Handle { - pub(super) inner: Arc, -} - -#[derive(Debug)] -pub(crate) struct ReadyEvent { - tick: u8, - pub(crate) ready: Ready, -} - -struct IoDispatcher { - allocator: slab::Allocator, - is_shutdown: bool, -} - -pub(super) struct Inner { /// Registers I/O resources. registry: mio::Registry, @@ -70,7 +53,18 @@ pub(super) struct Inner { #[cfg(not(tokio_wasi))] waker: mio::Waker, - metrics: IoDriverMetrics, + pub(crate) metrics: IoDriverMetrics, +} + +#[derive(Debug)] +pub(crate) struct ReadyEvent { + tick: u8, + pub(crate) ready: Ready, +} + +struct IoDispatcher { + allocator: slab::Allocator, + is_shutdown: bool, } #[derive(Debug, Eq, PartialEq, Clone, Copy)] @@ -128,13 +122,11 @@ impl Driver { }; let handle = Handle { - inner: Arc::new(Inner { - registry, - io_dispatch: RwLock::new(IoDispatcher::new(allocator)), - #[cfg(not(tokio_wasi))] - waker, - metrics: IoDriverMetrics::default(), - }), + registry, + io_dispatch: RwLock::new(IoDispatcher::new(allocator)), + #[cfg(not(tokio_wasi))] + waker, + metrics: IoDriverMetrics::default(), }; Ok((driver, handle)) @@ -153,7 +145,7 @@ impl Driver { pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) { let handle = rt_handle.io(); - if handle.inner.shutdown() { + if handle.shutdown() { self.resources.for_each(|io| { // If a task is waiting on the I/O resource, notify it. The task // will then attempt to use the I/O resource and fail due to the @@ -208,7 +200,7 @@ impl Driver { } } - handle.inner.metrics.incr_ready_count_by(ready_count); + handle.metrics.incr_ready_count_by(ready_count); } fn dispatch(resources: &mut Slab, tick: u8, token: mio::Token, ready: Ready) { @@ -236,16 +228,6 @@ impl fmt::Debug for Driver { } } -cfg_net! { - cfg_metrics! { - impl Handle { - pub(crate) fn metrics(&self) -> &IoDriverMetrics { - &self.inner.metrics - } - } - } -} - impl Handle { /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise /// makes the next call to `turn` return immediately. @@ -258,30 +240,9 @@ impl Handle { /// return immediately. pub(crate) fn unpark(&self) { #[cfg(not(tokio_wasi))] - self.inner.waker.wake().expect("failed to wake I/O driver"); + self.waker.wake().expect("failed to wake I/O driver"); } -} -impl fmt::Debug for Handle { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "Handle") - } -} - -// ===== impl IoDispatcher ===== - -impl IoDispatcher { - fn new(allocator: slab::Allocator) -> Self { - Self { - allocator, - is_shutdown: false, - } - } -} - -// ===== impl Inner ===== - -impl Inner { /// Registers an I/O resource with the reactor for a given `mio::Ready` state. /// /// The registration token is returned. @@ -342,6 +303,23 @@ impl Inner { } } +impl fmt::Debug for Handle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Handle") + } +} + +// ===== impl IoDispatcher ===== + +impl IoDispatcher { + fn new(allocator: slab::Allocator) -> Self { + Self { + allocator, + is_shutdown: false, + } + } +} + impl Direction { pub(super) fn mask(self) -> Ready { match self { @@ -355,7 +333,7 @@ impl Direction { cfg_signal_internal_and_unix! { impl Handle { pub(crate) fn register_signal_receiver(&self, receiver: &mut mio::net::UnixStream) -> io::Result<()> { - self.inner.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?; + self.registry.register(receiver, TOKEN_SIGNAL, mio::Interest::READABLE)?; Ok(()) } } diff --git a/tokio/src/runtime/io/registration.rs b/tokio/src/runtime/io/registration.rs index 1e9d9fc25de..157e91096f7 100644 --- a/tokio/src/runtime/io/registration.rs +++ b/tokio/src/runtime/io/registration.rs @@ -73,7 +73,7 @@ impl Registration { interest: Interest, handle: scheduler::Handle, ) -> io::Result { - let shared = handle.io().inner.add_source(io, interest)?; + let shared = handle.io().add_source(io, interest)?; Ok(Registration { handle, shared }) } @@ -95,7 +95,7 @@ impl Registration { /// /// `Err` is returned if an error is encountered. pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> { - self.handle().inner.deregister_source(io) + self.handle().deregister_source(io) } pub(crate) fn clear_readiness(&self, event: ReadyEvent) { @@ -148,7 +148,7 @@ impl Registration { let coop = ready!(crate::coop::poll_proceed(cx)); let ev = ready!(self.shared.poll_readiness(cx, direction)); - if self.handle().inner.is_shutdown() { + if self.handle().is_shutdown() { return Poll::Ready(Err(gone())); } @@ -230,7 +230,7 @@ cfg_io_readiness! { pin!(fut); crate::future::poll_fn(|cx| { - if self.handle().inner.is_shutdown() { + if self.handle().is_shutdown() { return Poll::Ready(Err(io::Error::new( io::ErrorKind::Other, crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR diff --git a/tokio/src/runtime/metrics/runtime.rs b/tokio/src/runtime/metrics/runtime.rs index 6ad5735b0f7..49c926302f5 100644 --- a/tokio/src/runtime/metrics/runtime.rs +++ b/tokio/src/runtime/metrics/runtime.rs @@ -530,7 +530,7 @@ cfg_net! { .driver() .io .as_ref() - .map(|h| f(h.metrics())) + .map(|h| f(&h.metrics)) .unwrap_or(0) } }