Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rt: fix *_closed false positives #5231

Merged
merged 11 commits into from Dec 5, 2022
23 changes: 16 additions & 7 deletions tokio/src/runtime/io/scheduled_io.rs
Expand Up @@ -62,7 +62,8 @@ cfg_io_readiness! {
/// The interest this waiter is waiting on.
interest: Interest,

is_ready: bool,
/// Awaited readiness.
ready: Ready,

/// Should never be `!Unpin`.
_p: PhantomPinned,
Expand Down Expand Up @@ -253,7 +254,7 @@ impl ScheduledIo {
let waiter = unsafe { &mut *waiter.as_ptr() };

if let Some(waker) = waiter.waker.take() {
waiter.is_ready = true;
waiter.ready = ready.intersection(waiter.interest);
wakers.push(waker);
}
}
Expand Down Expand Up @@ -389,7 +390,7 @@ cfg_io_readiness! {
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
is_ready: false,
ready: Ready::EMPTY,
interest,
_p: PhantomPinned,
}),
Expand Down Expand Up @@ -490,7 +491,7 @@ cfg_io_readiness! {
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };

if w.is_ready {
if !w.ready.is_empty() {
// Our waker has been notified.
*state = State::Done;
} else {
Expand All @@ -510,14 +511,22 @@ cfg_io_readiness! {
drop(waiters);
}
State::Done => {
let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;

// Safety: State::Done means it is no longer shared
let w = unsafe { &mut *waiter.get() };

// Note: the returned tick might be newer than the event
// which notified our waker. This is ok because the future
// still didn't return `Poll::Ready`.
let curr = scheduled_io.readiness.load(Acquire);
let tick = TICK.unpack(curr) as u8;

// Add more readiness which might have appeared in the meantime.
let curr_ready = Ready::from_usize(READINESS.unpack(curr));
let ready = w.ready | (curr_ready.intersection(w.interest));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a readiness bit is not included in curr_ready then something else already cleared it. Including w.ready here would result in additional false positives & unnecessary I/O ops.

Based on discord, it sounds like you want to avoid empty readiness events, but that seems better to me than false positives. The caller would loop and wait again.


return Poll::Ready(ReadyEvent {
tick,
ready: Ready::from_interest(w.interest),
ready,
});
}
}
Expand Down
49 changes: 45 additions & 4 deletions tokio/tests/tcp_stream.rs
Expand Up @@ -254,30 +254,34 @@ async fn create_pair() -> (TcpStream, TcpStream) {
(client, server)
}

fn read_until_pending(stream: &mut TcpStream) {
fn read_until_pending(stream: &mut TcpStream) -> usize {
let mut buf = vec![0u8; 1024 * 1024];
let mut total = 0;
loop {
match stream.try_read(&mut buf) {
Ok(_) => (),
Ok(n) => total += n,
Err(err) => {
assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
break;
}
}
}
total
}

fn write_until_pending(stream: &mut TcpStream) {
fn write_until_pending(stream: &mut TcpStream) -> usize {
let buf = vec![0u8; 1024 * 1024];
let mut total = 0;
loop {
match stream.try_write(&buf) {
Ok(_) => (),
Ok(n) => total += n,
Err(err) => {
assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
break;
}
}
}
total
}

#[tokio::test]
Expand Down Expand Up @@ -357,3 +361,40 @@ async fn try_read_buf() {
}
}
}

// read_closed is a best effort event, so test only for no false positives.
#[tokio::test]
async fn read_closed() {
let (client, mut server) = create_pair().await;

let mut ready_fut = task::spawn(client.ready(Interest::READABLE));
assert_pending!(ready_fut.poll());

assert_ok!(server.write_all(b"ping").await);

let ready_event = assert_ok!(ready_fut.await);

assert!(!ready_event.is_read_closed());
}

// write_closed is a best effort event, so test only for no false positives.
#[tokio::test]
async fn write_closed() {
let (mut client, mut server) = create_pair().await;

// Fill the write buffer.
let write_size = write_until_pending(&mut client);
let mut ready_fut = task::spawn(client.ready(Interest::WRITABLE));
assert_pending!(ready_fut.poll());

// Drain the socket to make client writable.
let mut read_size = 0;
while read_size < write_size {
server.readable().await.unwrap();
read_size += read_until_pending(&mut server);
}

let ready_event = assert_ok!(ready_fut.await);

assert!(!ready_event.is_write_closed());
}