Skip to content

Commit

Permalink
driver: fix shutdown notification unreliability
Browse files Browse the repository at this point in the history
Previously, there was a race window in which an IO driver shutting down could
fail to notify ScheduledIo instances of this state; in particular, notification
of outstanding ScheduledIo registrations was driven by `Driver::drop`, but
registrations bypass `Driver` and go directly to a `Weak<Inner>`. The `Driver`
holds the `Arc<Inner>` keeping `Inner` alive, but it's possible that a new
handle could be registered (or a new readiness future created for an existing
handle) after the `Driver::drop` handler runs and prior to `Inner` being
dropped.

This change fixes this in two parts: First, notification of outstanding
ScheduledIo handles is pushed down into the drop method of `Inner` instead,
and, second, we add state to ScheduledIo to ensure that we remember that the IO
driver we're bound to has shut down after the initial shutdown notification, so
that subsequent readiness future registrations can immediately return (instead
of potentially blocking indefinitely).

Fixes: #2924
  • Loading branch information
Bryan Donlan committed Oct 20, 2020
1 parent f4905ef commit 799e763
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 18 deletions.
13 changes: 11 additions & 2 deletions tokio/src/io/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use crate::util::slab;
/// Because these functions don't create a future to hold their state, they have
/// the limitation that only one task can wait on each direction (read or write)
/// at a time.

///
/// [`readable`]: method@Self::readable
/// [`writable`]: method@Self::writable
/// [`ReadyGuard`]: struct@self::ReadyGuard
Expand Down Expand Up @@ -295,7 +295,16 @@ impl<T> AsyncFd<T> {
}

async fn readiness(&self, interest: mio::Interest) -> io::Result<ReadyGuard<'_, T>> {
let event = self.shared.readiness(interest).await;
let event = self.shared.readiness(interest);

if !self.handle.is_alive() {
return Err(io::Error::new(
io::ErrorKind::Other,
"IO driver has terminated",
));
}

let event = event.await;
Ok(ReadyGuard {
async_fd: self,
event: Some(event),
Expand Down
51 changes: 39 additions & 12 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo; // pub(crate) for tests

use crate::park::{Park, Unpark};
use crate::util::bit;
use crate::util::slab::{self, Slab};
use crate::{loom::sync::Mutex, util::bit};

use std::fmt;
use std::io;
Expand All @@ -25,8 +25,10 @@ pub(crate) struct Driver {
events: Option<mio::Events>,

/// Primary slab handle containing the state for each resource registered
/// with this driver.
resources: Slab<ScheduledIo>,
/// with this driver. During Drop this is moved into the Inner structure, so
/// this is an Option to allow it to be vacated (until Drop this is always
/// Some)
resources: Option<Slab<ScheduledIo>>,

/// The system event queue
poll: mio::Poll,
Expand All @@ -47,6 +49,14 @@ pub(crate) struct ReadyEvent {
}

pub(super) struct Inner {
/// Primary slab handle containing the state for each resource registered
/// with this driver.
///
/// The ownership of this slab is moved into this structure during
/// `Driver::drop`, so that `Inner::drop` can notify all outstanding handles
/// without risking new ones being registered in the meantime.
resources: Mutex<Option<Slab<ScheduledIo>>>,

/// Registers I/O resources
registry: mio::Registry,

Expand Down Expand Up @@ -104,9 +114,10 @@ impl Driver {
Ok(Driver {
tick: 0,
events: Some(mio::Events::with_capacity(1024)),
resources: slab,
poll,
resources: Some(slab),
inner: Arc::new(Inner {
resources: Mutex::new(None),
registry,
io_dispatch: allocator,
waker,
Expand All @@ -133,7 +144,7 @@ impl Driver {
self.tick = self.tick.wrapping_add(1);

if self.tick == COMPACT_INTERVAL {
self.resources.compact();
self.resources.as_mut().unwrap().compact()
}

let mut events = self.events.take().expect("i/o driver event store missing");
Expand Down Expand Up @@ -163,7 +174,9 @@ impl Driver {
fn dispatch(&mut self, token: mio::Token, ready: Ready) {
let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let io = match self.resources.get(addr) {
let resources = self.resources.as_mut().unwrap();

let io = match resources.get(addr) {
Some(io) => io,
None => return,
};
Expand All @@ -181,12 +194,22 @@ impl Driver {

impl Drop for Driver {
fn drop(&mut self) {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.wake(Ready::ALL);
})
(*self.inner.resources.lock()) = self.resources.take();
}
}

impl Drop for Inner {
fn drop(&mut self) {
let resources = self.resources.lock().take();

resources.map(|mut slab| {
slab.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.shutdown();
})
});
}
}

Expand Down Expand Up @@ -267,6 +290,10 @@ impl Handle {
pub(super) fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}

pub(super) fn is_alive(&self) -> bool {
self.inner.strong_count() > 0
}
}

impl Unpark for Handle {
Expand Down
33 changes: 30 additions & 3 deletions tokio/src/io/driver/scheduled_io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{Direction, Ready, ReadyEvent, Tick};
use super::{Ready, ReadyEvent, Tick};
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::bit;
Expand All @@ -7,6 +7,8 @@ use crate::util::slab::Entry;
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::task::{Context, Poll, Waker};

use super::Direction;

cfg_io_readiness! {
use crate::util::linked_list::{self, LinkedList};

Expand Down Expand Up @@ -41,6 +43,9 @@ struct Waiters {

/// Waker used for AsyncWrite
writer: Option<Waker>,

/// True if this ScheduledIo has been killed due to IO driver shutdown
is_shutdown: bool,
}

cfg_io_readiness! {
Expand Down Expand Up @@ -121,6 +126,12 @@ impl ScheduledIo {
GENERATION.unpack(self.readiness.load(Acquire))
}

/// Invoked when the IO driver is shut down; forces this ScheduledIo into a
/// permanently ready state.
pub(super) fn shutdown(&self) {
self.wake0(Ready::ALL, true)
}

/// Sets the readiness on this `ScheduledIo` by invoking the given closure on
/// the current value, returning the previous readiness value.
///
Expand Down Expand Up @@ -197,13 +208,19 @@ impl ScheduledIo {
/// than 32 wakers to notify, if the stack array fills up, the lock is
/// released, the array is cleared, and the iteration continues.
pub(super) fn wake(&self, ready: Ready) {
self.wake0(ready, false);
}

fn wake0(&self, ready: Ready, shutdown: bool) {
const NUM_WAKERS: usize = 32;

let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
let mut curr = 0;

let mut waiters = self.waiters.lock();

waiters.is_shutdown |= shutdown;

// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
Expand Down Expand Up @@ -288,7 +305,12 @@ impl ScheduledIo {
// taking the waiters lock
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
if ready.is_empty() {
if waiters.is_shutdown {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: direction.mask(),
})
} else if ready.is_empty() {
Poll::Pending
} else {
Poll::Ready(ReadyEvent {
Expand Down Expand Up @@ -401,7 +423,12 @@ cfg_io_readiness! {
let mut waiters = scheduled_io.waiters.lock();

let curr = scheduled_io.readiness.load(SeqCst);
let ready = Ready::from_usize(READINESS.unpack(curr));
let mut ready = Ready::from_usize(READINESS.unpack(curr));

if waiters.is_shutdown {
ready = Ready::ALL;
}

let ready = ready.intersection(interest);

if !ready.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/loom/std/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub(crate) mod sync {
pub(crate) use crate::loom::std::atomic_u8::AtomicU8;
pub(crate) use crate::loom::std::atomic_usize::AtomicUsize;

pub(crate) use std::sync::atomic::{spin_loop_hint, AtomicBool};
pub(crate) use std::sync::atomic::{fence, spin_loop_hint, AtomicBool, Ordering};
}
}

Expand Down

0 comments on commit 799e763

Please sign in to comment.