Skip to content

Commit

Permalink
rt: fix *_closed false positives (#5231)
Browse files Browse the repository at this point in the history
Readiness futures inconsistently return the current readiness of an I/O resource if it is immediately available, or all readiness relevant for the given `Interest`, if a future needs to wait. In particular, it always returns `read_closed` for `Interest::READABLE` and `write_closed` for `Interest::WRITABLE`, which often is not true. Tokio should not tolerate false positives for `*_closed` events because they are considered final states and are not cleared internally.

In the case of an `io_resource.ready(Interest::READABLE | Interest::WRITABLE)` call, this behavior may also lead to false positives of other events.

## Solution

Follow the same strategy as `poll_ready` and return the current resource's readiness.

Closes: #5098
  • Loading branch information
satakuma committed Dec 5, 2022
1 parent a1316cd commit 644cb82
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 9 deletions.
12 changes: 12 additions & 0 deletions tokio/src/net/tcp/split.rs
Expand Up @@ -145,6 +145,12 @@ impl ReadHalf<'_> {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// This function is equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
Expand Down Expand Up @@ -273,6 +279,12 @@ impl WriteHalf<'_> {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// This function is equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/net/tcp/split_owned.rs
Expand Up @@ -200,6 +200,12 @@ impl OwnedReadHalf {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// This function is equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
Expand Down Expand Up @@ -355,6 +361,12 @@ impl OwnedWriteHalf {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// This function is equivalent to [`TcpStream::ready`].
///
/// # Cancel safety
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -377,6 +377,12 @@ impl TcpStream {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/net/udp.rs
Expand Up @@ -357,7 +357,9 @@ impl UdpSocket {
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`.
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/net/unix/datagram/socket.rs
Expand Up @@ -104,7 +104,9 @@ impl UnixDatagram {
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`.
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/net/unix/split.rs
Expand Up @@ -182,6 +182,12 @@ impl WriteHalf<'_> {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/net/unix/split_owned.rs
Expand Up @@ -114,6 +114,12 @@ impl OwnedReadHalf {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
Expand Down Expand Up @@ -265,6 +271,12 @@ impl OwnedWriteHalf {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
Expand Down
6 changes: 6 additions & 0 deletions tokio/src/net/unix/stream.rs
Expand Up @@ -66,6 +66,12 @@ impl UnixStream {
/// can be used to concurrently read / write to the same socket on a single
/// task without splitting the socket.
///
/// The function may complete without the socket being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Cancel safety
///
/// This method is cancel safe. Once a readiness event occurs, the method
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/net/windows/named_pipe.rs
Expand Up @@ -238,6 +238,12 @@ impl NamedPipeServer {
/// can be used to concurrently read / write to the same pipe on a single
/// task without splitting the pipe.
///
/// The function may complete without the pipe being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Examples
///
/// Concurrently read and write to the pipe on the same task without
Expand Down Expand Up @@ -989,6 +995,12 @@ impl NamedPipeClient {
/// can be used to concurrently read / write to the same pipe on a single
/// task without splitting the pipe.
///
/// The function may complete without the pipe being ready. This is a
/// false-positive and attempting an operation will return with
/// `io::ErrorKind::WouldBlock`. The function can also return with an empty
/// [`Ready`] set, so you should always check the returned value and possibly
/// wait again if the requested states are not set.
///
/// # Examples
///
/// Concurrently read and write to the pipe on the same task without
Expand Down
16 changes: 13 additions & 3 deletions tokio/src/runtime/io/scheduled_io.rs
Expand Up @@ -510,14 +510,24 @@ cfg_io_readiness! {
drop(waiters);
}
State::Done => {
let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8;

// Safety: State::Done means it is no longer shared
let w = unsafe { &mut *waiter.get() };

let curr = scheduled_io.readiness.load(Acquire);

// The returned tick might be newer than the event
// which notified our waker. This is ok because the future
// still didn't return `Poll::Ready`.
let tick = TICK.unpack(curr) as u8;

// The readiness state could have been cleared in the meantime,
// but we allow the returned ready set to be empty.
let curr_ready = Ready::from_usize(READINESS.unpack(curr));
let ready = curr_ready.intersection(w.interest);

return Poll::Ready(ReadyEvent {
tick,
ready: Ready::from_interest(w.interest),
ready,
});
}
}
Expand Down
49 changes: 45 additions & 4 deletions tokio/tests/tcp_stream.rs
Expand Up @@ -254,30 +254,34 @@ async fn create_pair() -> (TcpStream, TcpStream) {
(client, server)
}

fn read_until_pending(stream: &mut TcpStream) {
fn read_until_pending(stream: &mut TcpStream) -> usize {
let mut buf = vec![0u8; 1024 * 1024];
let mut total = 0;
loop {
match stream.try_read(&mut buf) {
Ok(_) => (),
Ok(n) => total += n,
Err(err) => {
assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
break;
}
}
}
total
}

fn write_until_pending(stream: &mut TcpStream) {
fn write_until_pending(stream: &mut TcpStream) -> usize {
let buf = vec![0u8; 1024 * 1024];
let mut total = 0;
loop {
match stream.try_write(&buf) {
Ok(_) => (),
Ok(n) => total += n,
Err(err) => {
assert_eq!(err.kind(), io::ErrorKind::WouldBlock);
break;
}
}
}
total
}

#[tokio::test]
Expand Down Expand Up @@ -357,3 +361,40 @@ async fn try_read_buf() {
}
}
}

// read_closed is a best effort event, so test only for no false positives.
#[tokio::test]
async fn read_closed() {
let (client, mut server) = create_pair().await;

let mut ready_fut = task::spawn(client.ready(Interest::READABLE));
assert_pending!(ready_fut.poll());

assert_ok!(server.write_all(b"ping").await);

let ready_event = assert_ok!(ready_fut.await);

assert!(!ready_event.is_read_closed());
}

// write_closed is a best effort event, so test only for no false positives.
#[tokio::test]
async fn write_closed() {
let (mut client, mut server) = create_pair().await;

// Fill the write buffer.
let write_size = write_until_pending(&mut client);
let mut ready_fut = task::spawn(client.ready(Interest::WRITABLE));
assert_pending!(ready_fut.poll());

// Drain the socket to make client writable.
let mut read_size = 0;
while read_size < write_size {
server.readable().await.unwrap();
read_size += read_until_pending(&mut server);
}

let ready_event = assert_ok!(ready_fut.await);

assert!(!ready_event.is_write_closed());
}

0 comments on commit 644cb82

Please sign in to comment.