Skip to content

Commit

Permalink
io_async_fd: add tests for wakeup on rt shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Bryan Donlan committed Oct 21, 2020
1 parent 799e763 commit 62deeaf
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 2 deletions.
16 changes: 16 additions & 0 deletions tokio/src/io/async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,14 @@ impl<T> AsyncFd<T> {
) -> Poll<io::Result<ReadyGuard<'a, T>>> {
let event = ready!(self.shared.poll_readiness(cx, Direction::Read));

if !self.handle.is_alive() {
return Err(io::Error::new(
io::ErrorKind::Other,
"IO driver has terminated",
))
.into();
}

Ok(ReadyGuard {
async_fd: self,
event: Some(event),
Expand All @@ -287,6 +295,14 @@ impl<T> AsyncFd<T> {
) -> Poll<io::Result<ReadyGuard<'a, T>>> {
let event = ready!(self.shared.poll_readiness(cx, Direction::Write));

if !self.handle.is_alive() {
return Err(io::Error::new(
io::ErrorKind::Other,
"IO driver has terminated",
))
.into();
}

Ok(ReadyGuard {
async_fd: self,
event: Some(event),
Expand Down
148 changes: 146 additions & 2 deletions tokio/tests/io_async_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::{
};
use std::time::Duration;
use std::{
future::Future,
io::{self, ErrorKind, Read, Write},
task::{Context, Waker},
};
Expand All @@ -17,8 +18,8 @@ use nix::unistd::{close, read, write};

use futures::{poll, FutureExt};

use tokio::io::AsyncFd;
use tokio_test::assert_pending;
use tokio::io::{AsyncFd, ReadyGuard};
use tokio_test::{assert_err, assert_pending};

struct TestWaker {
inner: Arc<TestWakerInner>,
Expand Down Expand Up @@ -447,3 +448,146 @@ async fn poll_fns() {
// now we should be writable (ie - the waker for poll_write should still be registered after we wake the read side)
let _ = write_fut.await;
}

fn assert_pending<T: std::fmt::Debug, F: Future<Output = T>>(f: F) -> std::pin::Pin<Box<F>> {
let mut pinned = Box::pin(f);

assert_pending!(pinned
.as_mut()
.poll(&mut Context::from_waker(futures::task::noop_waker_ref())));

pinned
}

fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
}

#[test]
fn driver_shutdown_wakes_currently_pending() {
let rt = rt();

let (a, _b) = socketpair();
let afd_a = {
let _enter = rt.enter();
AsyncFd::new(a).unwrap()
};

let readable = assert_pending(afd_a.readable());

std::mem::drop(rt);

// Being awoken by a rt drop does not return an error, currently...
let _ = futures::executor::block_on(readable).unwrap();

// However, attempting to initiate a readiness wait when the rt is dropped is an error
assert_err!(futures::executor::block_on(afd_a.readable()));
}

#[test]
fn driver_shutdown_wakes_future_pending() {
let rt = rt();

let (a, _b) = socketpair();
let afd_a = {
let _enter = rt.enter();
AsyncFd::new(a).unwrap()
};

std::mem::drop(rt);

assert_err!(futures::executor::block_on(afd_a.readable()));
}

#[test]
fn driver_shutdown_wakes_pending_race() {
// TODO: make this a loom test
for _ in 0..100 {
let rt = rt();

let (a, _b) = socketpair();
let afd_a = {
let _enter = rt.enter();
AsyncFd::new(a).unwrap()
};

let _ = std::thread::spawn(move || std::mem::drop(rt));

// This may or may not return an error (but will be awoken)
let _ = futures::executor::block_on(afd_a.readable());

// However retrying will always return an error
assert_err!(futures::executor::block_on(afd_a.readable()));
}
}

async fn poll_readable<T>(fd: &AsyncFd<T>) -> std::io::Result<ReadyGuard<'_, T>> {
futures::future::poll_fn(|cx| fd.poll_read_ready(cx)).await
}

async fn poll_writable<T>(fd: &AsyncFd<T>) -> std::io::Result<ReadyGuard<'_, T>> {
futures::future::poll_fn(|cx| fd.poll_write_ready(cx)).await
}

#[test]
fn driver_shutdown_wakes_currently_pending_polls() {
let rt = rt();

let (a, _b) = socketpair();
let afd_a = {
let _enter = rt.enter();
AsyncFd::new(a).unwrap()
};

while afd_a.inner().write(&[0; 512]).is_ok() {} // make not writable

let readable = assert_pending(poll_readable(&afd_a));
let writable = assert_pending(poll_writable(&afd_a));

std::mem::drop(rt);

// Attempting to poll readiness when the rt is dropped is an error
assert_err!(futures::executor::block_on(readable));
assert_err!(futures::executor::block_on(writable));
}

#[test]
fn driver_shutdown_wakes_poll() {
let rt = rt();

let (a, _b) = socketpair();
let afd_a = {
let _enter = rt.enter();
AsyncFd::new(a).unwrap()
};

std::mem::drop(rt);

assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
}

#[test]
fn driver_shutdown_wakes_poll_race() {
// TODO: make this a loom test
for _ in 0..100 {
let rt = rt();

let (a, _b) = socketpair();
let afd_a = {
let _enter = rt.enter();
AsyncFd::new(a).unwrap()
};

while afd_a.inner().write(&[0; 512]).is_ok() {} // make not writable

let _ = std::thread::spawn(move || std::mem::drop(rt));

// The poll variants will always return an error in this case
assert_err!(futures::executor::block_on(poll_readable(&afd_a)));
assert_err!(futures::executor::block_on(poll_writable(&afd_a)));
}
}

0 comments on commit 62deeaf

Please sign in to comment.