Skip to content

Commit

Permalink
io: refactor out usage of Weak in the io handle (#4656)
Browse files Browse the repository at this point in the history
  • Loading branch information
hidva committed May 19, 2022
1 parent 3fce06c commit 931a777
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 79 deletions.
137 changes: 74 additions & 63 deletions tokio/src/io/driver/mod.rs
Expand Up @@ -18,13 +18,13 @@ mod metrics;

use crate::park::{Park, Unpark};
use crate::util::slab::{self, Slab};
use crate::{loom::sync::Mutex, util::bit};
use crate::{loom::sync::RwLock, util::bit};

use metrics::IoDriverMetrics;

use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
use std::sync::Arc;
use std::time::Duration;

/// I/O driver, backed by Mio.
Expand All @@ -37,10 +37,8 @@ pub(crate) struct Driver {
events: Option<mio::Events>,

/// Primary slab handle containing the state for each resource registered
/// with this driver. During Drop this is moved into the Inner structure, so
/// this is an Option to allow it to be vacated (until Drop this is always
/// Some).
resources: Option<Slab<ScheduledIo>>,
/// with this driver.
resources: Slab<ScheduledIo>,

/// The system event queue.
poll: mio::Poll,
Expand All @@ -52,7 +50,7 @@ pub(crate) struct Driver {
/// A reference to an I/O driver.
#[derive(Clone)]
pub(crate) struct Handle {
inner: Weak<Inner>,
pub(super) inner: Arc<Inner>,
}

#[derive(Debug)]
Expand All @@ -61,20 +59,17 @@ pub(crate) struct ReadyEvent {
pub(crate) ready: Ready,
}

pub(super) struct Inner {
/// Primary slab handle containing the state for each resource registered
/// with this driver.
///
/// The ownership of this slab is moved into this structure during
/// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles
/// without risking new ones being registered in the meantime.
resources: Mutex<Option<Slab<ScheduledIo>>>,
struct IoDispatcher {
allocator: slab::Allocator<ScheduledIo>,
is_shutdown: bool,
}

pub(super) struct Inner {
/// Registers I/O resources.
registry: mio::Registry,

/// Allocates `ScheduledIo` handles when creating new resources.
pub(super) io_dispatch: slab::Allocator<ScheduledIo>,
io_dispatch: RwLock<IoDispatcher>,

/// Used to wake up the reactor from a call to `turn`.
waker: mio::Waker,
Expand Down Expand Up @@ -130,11 +125,10 @@ impl Driver {
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
poll,
resources: Some(slab),
resources: slab,
inner: Arc::new(Inner {
resources: Mutex::new(None),
registry,
io_dispatch: allocator,
io_dispatch: RwLock::new(IoDispatcher::new(allocator)),
waker,
metrics: IoDriverMetrics::default(),
}),
Expand All @@ -149,7 +143,7 @@ impl Driver {
/// to bind them to this event loop.
pub(crate) fn handle(&self) -> Handle {
Handle {
inner: Arc::downgrade(&self.inner),
inner: Arc::clone(&self.inner),
}
}

Expand All @@ -160,7 +154,7 @@ impl Driver {
self.tick = self.tick.wrapping_add(1);

if self.tick == COMPACT_INTERVAL {
self.resources.as_mut().unwrap().compact()
self.resources.compact()
}

let mut events = self.events.take().expect("i/o driver event store missing");
Expand Down Expand Up @@ -194,7 +188,7 @@ impl Driver {
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let resources = self.resources.as_mut().unwrap();
let resources = &mut self.resources;

let io = match resources.get(addr) {
Some(io) => io,
Expand All @@ -214,22 +208,7 @@ impl Driver {

impl Drop for Driver {
fn drop(&mut self) {
(*self.inner.resources.lock()) = self.resources.take();
}
}

impl Drop for Inner {
fn drop(&mut self) {
let resources = self.resources.lock().take();

if let Some(mut slab) = resources {
slab.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.shutdown();
});
}
self.shutdown();
}
}

Expand All @@ -251,7 +230,16 @@ impl Park for Driver {
Ok(())
}

fn shutdown(&mut self) {}
fn shutdown(&mut self) {
if self.inner.shutdown() {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown. And shutdown will clear all wakers.
io.shutdown();
});
}
}
}

impl fmt::Debug for Driver {
Expand Down Expand Up @@ -292,18 +280,11 @@ cfg_not_rt! {

cfg_net! {
cfg_metrics! {
impl Handle {
// TODO: Remove this when handle contains `Arc<Inner>` so that we can return
// &IoDriverMetrics instead of using a closure.
//
// Related issue: https://github.com/tokio-rs/tokio/issues/4509
pub(crate) fn with_io_driver_metrics<F, R>(&self, f: F) -> Option<R>
where
F: Fn(&IoDriverMetrics) -> R,
{
self.inner().map(|inner| f(&inner.metrics))
impl Handle {
pub(crate) fn metrics(&self) -> &IoDriverMetrics {
&self.inner.metrics
}
}
}
}
}

Expand All @@ -318,13 +299,7 @@ impl Handle {
/// blocked in `turn`, then the next call to `turn` will not block and
/// return immediately.
fn wakeup(&self) {
if let Some(inner) = self.inner() {
inner.waker.wake().expect("failed to wake I/O driver");
}
}

pub(super) fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
self.inner.waker.wake().expect("failed to wake I/O driver");
}
}

Expand All @@ -340,6 +315,17 @@ impl fmt::Debug for Handle {
}
}

// ===== impl IoDispatcher =====

impl IoDispatcher {
fn new(allocator: slab::Allocator<ScheduledIo>) -> Self {
Self {
allocator,
is_shutdown: false,
}
}
}

// ===== impl Inner =====

impl Inner {
Expand All @@ -351,12 +337,7 @@ impl Inner {
source: &mut impl mio::event::Source,
interest: Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})?;
let (address, shared) = self.allocate()?;

let token = GENERATION.pack(shared.generation(), ADDRESS.pack(address.as_usize(), 0));

Expand All @@ -376,6 +357,36 @@ impl Inner {

Ok(())
}

/// shutdown the dispatcher.
fn shutdown(&self) -> bool {
let mut io = self.io_dispatch.write().unwrap();
if io.is_shutdown {
return false;
}
io.is_shutdown = true;
true
}

fn is_shutdown(&self) -> bool {
return self.io_dispatch.read().unwrap().is_shutdown;
}

fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> {
let io = self.io_dispatch.read().unwrap();
if io.is_shutdown {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
));
}
io.allocator.allocate().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})
}
}

impl Direction {
Expand Down
19 changes: 4 additions & 15 deletions tokio/src/io/driver/registration.rs
Expand Up @@ -72,14 +72,7 @@ impl Registration {
interest: Interest,
handle: Handle,
) -> io::Result<Registration> {
let shared = if let Some(inner) = handle.inner() {
inner.add_source(io, interest)?
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
));
};
let shared = handle.inner.add_source(io, interest)?;

Ok(Registration { handle, shared })
}
Expand All @@ -101,11 +94,7 @@ impl Registration {
///
/// `Err` is returned if an error is encountered.
pub(crate) fn deregister(&mut self, io: &mut impl Source) -> io::Result<()> {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
inner.deregister_source(io)
self.handle.inner.deregister_source(io)
}

pub(crate) fn clear_readiness(&self, event: ReadyEvent) {
Expand Down Expand Up @@ -157,7 +146,7 @@ impl Registration {
let coop = ready!(crate::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));

if self.handle.inner().is_none() {
if self.handle.inner.is_shutdown() {
return Poll::Ready(Err(gone()));
}

Expand Down Expand Up @@ -235,7 +224,7 @@ cfg_io_readiness! {
pin!(fut);

crate::future::poll_fn(|cx| {
if self.handle.inner().is_none() {
if self.handle.inner.is_shutdown() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/runtime/metrics/runtime.rs
Expand Up @@ -529,7 +529,7 @@ cfg_net! {
.as_inner()
.io_handle
.as_ref()
.and_then(|h| h.with_io_driver_metrics(f))
.map(|h| f(h.metrics()))
.unwrap_or(0)
}
}
Expand Down

0 comments on commit 931a777

Please sign in to comment.