Skip to content

Commit

Permalink
net: Avoid race furing io driver shutdown
Browse files Browse the repository at this point in the history
To avoid IO resources getting registered while the driver
is shutting down,`state: AtomicUsize` is introduced.

Fixes: tokio-rs#2924
  • Loading branch information
zaharidichev committed Oct 19, 2020
1 parent 3cc6ce7 commit 54dd8b0
Showing 1 changed file with 48 additions and 11 deletions.
59 changes: 48 additions & 11 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use crate::park::{Park, Unpark};
use crate::util::bit;
use crate::util::slab::{self, Slab};

use crate::loom::sync::atomic::AtomicUsize;
use std::fmt;
use std::io;
use std::sync::atomic::Ordering::*;
use std::sync::{Arc, Weak};
use std::time::Duration;

Expand All @@ -32,7 +34,7 @@ pub(crate) struct Driver {
poll: mio::Poll,

/// State shared between the reactor and the handles.
inner: Arc<Inner>,
inner: Option<Arc<Inner>>,
}

/// A reference to an I/O driver
Expand All @@ -55,8 +57,19 @@ pub(super) struct Inner {

/// Used to wake up the reactor from a call to `turn`
waker: mio::Waker,

state: AtomicUsize,
}

/// Idle state
const WAITING: usize = 0;

/// An IO resource is being added
const REGISTERING: usize = 1;

/// Closed state
const CLOSED: usize = 2;

#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(super) enum Direction {
Read,
Expand Down Expand Up @@ -106,11 +119,12 @@ impl Driver {
events: Some(mio::Events::with_capacity(1024)),
resources: slab,
poll,
inner: Arc::new(Inner {
inner: Some(Arc::new(Inner {
registry,
io_dispatch: allocator,
waker,
}),
state: AtomicUsize::new(WAITING),
})),
})
}

Expand All @@ -122,7 +136,7 @@ impl Driver {
/// to bind them to this event loop.
pub(crate) fn handle(&self) -> Handle {
Handle {
inner: Arc::downgrade(&self.inner),
inner: Arc::downgrade(self.inner.as_ref().expect("reactor closed")),
}
}

Expand Down Expand Up @@ -181,12 +195,24 @@ impl Driver {

impl Drop for Driver {
fn drop(&mut self) {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.wake(Ready::ALL);
})
let inner = self.inner.take().expect("reactor closed");
loop {
if inner
.state
.compare_exchange(WAITING, CLOSED, AcqRel, Acquire)
.is_ok()
{
drop(inner);
/// we drop the arc first
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.wake(Ready::ALL);
});
return;
}
}
}
}

Expand Down Expand Up @@ -292,6 +318,17 @@ impl Inner {
source: &mut impl mio::event::Source,
interest: mio::Interest,
) -> io::Result<slab::Ref<ScheduledIo>> {
loop {
match self
.state
.compare_exchange(WAITING, REGISTERING, AcqRel, Acquire)
{
Ok(_) | Err(REGISTERING) => break,
Err(CLOSED) => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
_ => unreachable!(),
}
}

let (address, shared) = self.io_dispatch.allocate().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
Expand All @@ -303,7 +340,7 @@ impl Inner {

self.registry
.register(source, mio::Token(token), interest)?;

self.state.store(WAITING, Release);
Ok(shared)
}

Expand Down

0 comments on commit 54dd8b0

Please sign in to comment.