diff --git a/tokio/src/net/tcp/split.rs b/tokio/src/net/tcp/split.rs index 2ea08b3dc32..343d4fde7d5 100644 --- a/tokio/src/net/tcp/split.rs +++ b/tokio/src/net/tcp/split.rs @@ -145,6 +145,12 @@ impl ReadHalf<'_> { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// This function is equivalent to [`TcpStream::ready`]. /// /// # Cancel safety @@ -273,6 +279,12 @@ impl WriteHalf<'_> { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// This function is equivalent to [`TcpStream::ready`]. /// /// # Cancel safety diff --git a/tokio/src/net/tcp/split_owned.rs b/tokio/src/net/tcp/split_owned.rs index e2cfdefe1a3..b2730e8fb0f 100644 --- a/tokio/src/net/tcp/split_owned.rs +++ b/tokio/src/net/tcp/split_owned.rs @@ -200,6 +200,12 @@ impl OwnedReadHalf { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// This function is equivalent to [`TcpStream::ready`]. /// /// # Cancel safety @@ -355,6 +361,12 @@ impl OwnedWriteHalf { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// This function is equivalent to [`TcpStream::ready`]. /// /// # Cancel safety diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 0b8529546c6..b7dd3377b75 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -377,6 +377,12 @@ impl TcpStream { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index 922a977a929..af343f20090 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -357,7 +357,9 @@ impl UdpSocket { /// /// The function may complete without the socket being ready. This is a /// false-positive and attempting an operation will return with - /// `io::ErrorKind::WouldBlock`. + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. /// /// # Cancel safety /// diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 0f5dca421cf..5e1453e380d 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -104,7 +104,9 @@ impl UnixDatagram { /// /// The function may complete without the socket being ready. This is a /// false-positive and attempting an operation will return with - /// `io::ErrorKind::WouldBlock`. + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. /// /// # Cancel safety /// diff --git a/tokio/src/net/unix/split.rs b/tokio/src/net/unix/split.rs index f9664d53254..816a2578b5f 100644 --- a/tokio/src/net/unix/split.rs +++ b/tokio/src/net/unix/split.rs @@ -182,6 +182,12 @@ impl WriteHalf<'_> { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method diff --git a/tokio/src/net/unix/split_owned.rs b/tokio/src/net/unix/split_owned.rs index 3dce2a86aa8..da41ced83c2 100644 --- a/tokio/src/net/unix/split_owned.rs +++ b/tokio/src/net/unix/split_owned.rs @@ -114,6 +114,12 @@ impl OwnedReadHalf { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method @@ -265,6 +271,12 @@ impl OwnedWriteHalf { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index e9acbb68240..2d278986c97 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -66,6 +66,12 @@ impl UnixStream { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 3f7a5a4fe9d..692c69ded46 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -238,6 +238,12 @@ impl NamedPipeServer { /// can be used to concurrently read / write to the same pipe on a single /// task without splitting the pipe. /// + /// The function may complete without the pipe being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Examples /// /// Concurrently read and write to the pipe on the same task without @@ -989,6 +995,12 @@ impl NamedPipeClient { /// can be used to concurrently read / write to the same pipe on a single /// task without splitting the pipe. /// + /// The function may complete without the pipe being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Examples /// /// Concurrently read and write to the pipe on the same task without diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index af42ba8a31e..1709091032b 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -510,14 +510,24 @@ cfg_io_readiness! { drop(waiters); } State::Done => { - let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; - // Safety: State::Done means it is no longer shared let w = unsafe { &mut *waiter.get() }; + let curr = scheduled_io.readiness.load(Acquire); + + // The returned tick might be newer than the event + // which notified our waker. This is ok because the future + // still didn't return `Poll::Ready`. + let tick = TICK.unpack(curr) as u8; + + // The readiness state could have been cleared in the meantime, + // but we allow the returned ready set to be empty. + let curr_ready = Ready::from_usize(READINESS.unpack(curr)); + let ready = curr_ready.intersection(w.interest); + return Poll::Ready(ReadyEvent { tick, - ready: Ready::from_interest(w.interest), + ready, }); } } diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs index 453023fc51d..31fe3baa296 100644 --- a/tokio/tests/tcp_stream.rs +++ b/tokio/tests/tcp_stream.rs @@ -254,30 +254,34 @@ async fn create_pair() -> (TcpStream, TcpStream) { (client, server) } -fn read_until_pending(stream: &mut TcpStream) { +fn read_until_pending(stream: &mut TcpStream) -> usize { let mut buf = vec![0u8; 1024 * 1024]; + let mut total = 0; loop { match stream.try_read(&mut buf) { - Ok(_) => (), + Ok(n) => total += n, Err(err) => { assert_eq!(err.kind(), io::ErrorKind::WouldBlock); break; } } } + total } -fn write_until_pending(stream: &mut TcpStream) { +fn write_until_pending(stream: &mut TcpStream) -> usize { let buf = vec![0u8; 1024 * 1024]; + let mut total = 0; loop { match stream.try_write(&buf) { - Ok(_) => (), + Ok(n) => total += n, Err(err) => { assert_eq!(err.kind(), io::ErrorKind::WouldBlock); break; } } } + total } #[tokio::test] @@ -357,3 +361,40 @@ async fn try_read_buf() { } } } + +// read_closed is a best effort event, so test only for no false positives. +#[tokio::test] +async fn read_closed() { + let (client, mut server) = create_pair().await; + + let mut ready_fut = task::spawn(client.ready(Interest::READABLE)); + assert_pending!(ready_fut.poll()); + + assert_ok!(server.write_all(b"ping").await); + + let ready_event = assert_ok!(ready_fut.await); + + assert!(!ready_event.is_read_closed()); +} + +// write_closed is a best effort event, so test only for no false positives. +#[tokio::test] +async fn write_closed() { + let (mut client, mut server) = create_pair().await; + + // Fill the write buffer. + let write_size = write_until_pending(&mut client); + let mut ready_fut = task::spawn(client.ready(Interest::WRITABLE)); + assert_pending!(ready_fut.poll()); + + // Drain the socket to make client writable. + let mut read_size = 0; + while read_size < write_size { + server.readable().await.unwrap(); + read_size += read_until_pending(&mut server); + } + + let ready_event = assert_ok!(ready_fut.await); + + assert!(!ready_event.is_write_closed()); +}