Skip to content

Commit

Permalink
remove frequent global is_shutdown flag check on io operations
Browse files Browse the repository at this point in the history
The global flag is still remaining, and used to prevent duplicated
shutdown and new io operations after shutdown.

On shutdown, the driver flip the shutdown flag of every pending io
operations and wake them to fail with shutdown error.

Fixes: tokio-rs#5227
  • Loading branch information
HyeonuPark committed Dec 17, 2022
1 parent 42db755 commit c30924a
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 40 deletions.
4 changes: 2 additions & 2 deletions tokio/docs/reactor-refactor.md
Expand Up @@ -188,12 +188,12 @@ readiness, the driver's tick is packed into the atomic `usize`.
The `ScheduledIo` readiness `AtomicUsize` is structured as:

```
| reserved | generation | driver tick | readiness |
| shutdown | generation | driver tick | readiness |
|----------+------------+--------------+-----------|
| 1 bit | 7 bits + 8 bits + 16 bits |
```

The `reserved` and `generation` components exist today.
The `shutdown` and `generation` components exist today.

The `readiness()` function returns a `ReadyEvent` value. This value includes the
`tick` component read with the resource's readiness value. When
Expand Down
12 changes: 4 additions & 8 deletions tokio/src/runtime/io/mod.rs
Expand Up @@ -60,6 +60,7 @@ pub(crate) struct Handle {
pub(crate) struct ReadyEvent {
tick: u8,
pub(crate) ready: Ready,
is_shutdown: bool,
}

struct IoDispatcher {
Expand Down Expand Up @@ -147,9 +148,8 @@ impl Driver {

if handle.shutdown() {
self.resources.for_each(|io| {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown. And shutdown will clear all wakers.
// If a task is waiting on the I/O resource, notify it that the
// runtime is being shutdown. And shutdown will clear all wakers.
io.shutdown();
});
}
Expand Down Expand Up @@ -282,16 +282,12 @@ impl Handle {
true
}

fn is_shutdown(&self) -> bool {
return self.io_dispatch.read().unwrap().is_shutdown;
}

fn allocate(&self) -> io::Result<(slab::Address, slab::Ref<ScheduledIo>)> {
let io = self.io_dispatch.read().unwrap();
if io.is_shutdown {
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to find event loop",
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
));
}
io.allocator.allocate().ok_or_else(|| {
Expand Down
24 changes: 13 additions & 11 deletions tokio/src/runtime/io/registration.rs
Expand Up @@ -148,7 +148,7 @@ impl Registration {
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
let ev = ready!(self.shared.poll_readiness(cx, direction));

if self.handle().is_shutdown() {
if ev.is_shutdown {
return Poll::Ready(Err(gone()));
}

Expand Down Expand Up @@ -217,7 +217,10 @@ impl Drop for Registration {
}

fn gone() -> io::Error {
io::Error::new(io::ErrorKind::Other, "IO driver has terminated")
io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR,
)
}

cfg_io_readiness! {
Expand All @@ -229,16 +232,15 @@ cfg_io_readiness! {
let fut = self.shared.readiness(interest);
pin!(fut);

crate::future::poll_fn(|cx| {
if self.handle().is_shutdown() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
crate::util::error::RUNTIME_SHUTTING_DOWN_ERROR
)));
}
let ev = crate::future::poll_fn(|cx| {
Pin::new(&mut fut).poll(cx)
}).await;

if ev.is_shutdown {
return Err(gone())
}

Pin::new(&mut fut).poll(cx).map(Ok)
}).await
Ok(ev)
}

pub(crate) async fn async_io<R>(&self, interest: Interest, mut f: impl FnMut() -> io::Result<R>) -> io::Result<R> {
Expand Down
43 changes: 24 additions & 19 deletions tokio/src/runtime/io/scheduled_io.rs
Expand Up @@ -46,9 +46,6 @@ struct Waiters {

/// Waker used for AsyncWrite.
writer: Option<Waker>,

/// True if this ScheduledIo has been killed due to IO driver shutdown.
is_shutdown: bool,
}

cfg_io_readiness! {
Expand Down Expand Up @@ -95,7 +92,7 @@ cfg_io_readiness! {

// The `ScheduledIo::readiness` (`AtomicUsize`) is packed full of goodness.
//
// | reserved | generation | driver tick | readiness |
// | shutdown | generation | driver tick | readiness |
// |----------+------------+--------------+-----------|
// | 1 bit | 7 bits + 8 bits + 16 bits |

Expand All @@ -105,6 +102,8 @@ const TICK: bit::Pack = READINESS.then(8);

const GENERATION: bit::Pack = TICK.then(7);

const SHUTDOWN: bit::Pack = GENERATION.then(1);

#[test]
fn test_generations_assert_same() {
assert_eq!(super::GENERATION, GENERATION);
Expand Down Expand Up @@ -138,9 +137,11 @@ impl ScheduledIo {
}

/// Invoked when the IO driver is shut down; forces this ScheduledIo into a
/// permanently ready state.
/// permanently shutdown state.
pub(super) fn shutdown(&self) {
self.wake0(Ready::ALL, true)
let mask = SHUTDOWN.pack(1, 0);
self.readiness.fetch_or(mask, AcqRel);
self.wake(Ready::ALL);
}

/// Sets the readiness on this `ScheduledIo` by invoking the given closure on
Expand Down Expand Up @@ -219,16 +220,10 @@ impl ScheduledIo {
/// than 32 wakers to notify, if the stack array fills up, the lock is
/// released, the array is cleared, and the iteration continues.
pub(super) fn wake(&self, ready: Ready) {
self.wake0(ready, false);
}

fn wake0(&self, ready: Ready, shutdown: bool) {
let mut wakers = WakeList::new();

let mut waiters = self.waiters.lock();

waiters.is_shutdown |= shutdown;

// check for AsyncRead slot
if ready.is_readable() {
if let Some(waker) = waiters.reader.take() {
Expand Down Expand Up @@ -283,6 +278,7 @@ impl ScheduledIo {
ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: interest.mask() & Ready::from_usize(READINESS.unpack(curr)),
is_shutdown: SHUTDOWN.unpack(curr) != 0,
}
}

Expand All @@ -299,8 +295,9 @@ impl ScheduledIo {
let curr = self.readiness.load(Acquire);

let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;

if ready.is_empty() {
if ready.is_empty() && !is_shutdown {
// Update the task info
let mut waiters = self.waiters.lock();
let slot = match direction {
Expand All @@ -325,23 +322,27 @@ impl ScheduledIo {
// taking the waiters lock
let curr = self.readiness.load(Acquire);
let ready = direction.mask() & Ready::from_usize(READINESS.unpack(curr));
if waiters.is_shutdown {
let is_shutdown = SHUTDOWN.unpack(curr) != 0;
if is_shutdown {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready: direction.mask(),
is_shutdown,
})
} else if ready.is_empty() {
Poll::Pending
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
is_shutdown,
})
}
} else {
Poll::Ready(ReadyEvent {
tick: TICK.unpack(curr) as u8,
ready,
is_shutdown,
})
}
}
Expand Down Expand Up @@ -433,35 +434,37 @@ cfg_io_readiness! {
// Optimistically check existing readiness
let curr = scheduled_io.readiness.load(SeqCst);
let ready = Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;

// Safety: `waiter.interest` never changes
let interest = unsafe { (*waiter.get()).interest };
let ready = ready.intersection(interest);

if !ready.is_empty() {
if !ready.is_empty() || is_shutdown {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent { tick, ready });
return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
}

// Wasn't ready, take the lock (and check again while locked).
let mut waiters = scheduled_io.waiters.lock();

let curr = scheduled_io.readiness.load(SeqCst);
let mut ready = Ready::from_usize(READINESS.unpack(curr));
let is_shutdown = SHUTDOWN.unpack(curr) != 0;

if waiters.is_shutdown {
if is_shutdown {
ready = Ready::ALL;
}

let ready = ready.intersection(interest);

if !ready.is_empty() {
if !ready.is_empty() || is_shutdown {
// Currently ready!
let tick = TICK.unpack(curr) as u8;
*state = State::Done;
return Poll::Ready(ReadyEvent { tick, ready });
return Poll::Ready(ReadyEvent { tick, ready, is_shutdown });
}

// Not ready even after locked, insert into list...
Expand Down Expand Up @@ -514,6 +517,7 @@ cfg_io_readiness! {
let w = unsafe { &mut *waiter.get() };

let curr = scheduled_io.readiness.load(Acquire);
let is_shutdown = SHUTDOWN.unpack(curr) != 0;

// The returned tick might be newer than the event
// which notified our waker. This is ok because the future
Expand All @@ -528,6 +532,7 @@ cfg_io_readiness! {
return Poll::Ready(ReadyEvent {
tick,
ready,
is_shutdown,
});
}
}
Expand Down

0 comments on commit c30924a

Please sign in to comment.