From a6e6df20ad6632e11a375ad426744597ddde6c91 Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Mon, 21 Nov 2022 14:18:07 +0100 Subject: [PATCH 1/7] store ready for each readiness future --- tokio/src/io/ready.rs | 2 +- tokio/src/runtime/io/scheduled_io.rs | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/tokio/src/io/ready.rs b/tokio/src/io/ready.rs index ef135c43554..2b7252f1f34 100644 --- a/tokio/src/io/ready.rs +++ b/tokio/src/io/ready.rs @@ -178,7 +178,7 @@ cfg_io_readiness! { use crate::io::Interest; impl Ready { - pub(crate) fn from_interest(interest: Interest) -> Ready { + fn from_interest(interest: Interest) -> Ready { let mut ready = Ready::EMPTY; if interest.is_readable() { diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index af42ba8a31e..dd60ea0af77 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -62,7 +62,8 @@ cfg_io_readiness! { /// The interest this waiter is waiting on. interest: Interest, - is_ready: bool, + /// Awaited readiness. + ready: Ready, /// Should never be `!Unpin`. _p: PhantomPinned, @@ -253,7 +254,7 @@ impl ScheduledIo { let waiter = unsafe { &mut *waiter.as_ptr() }; if let Some(waker) = waiter.waker.take() { - waiter.is_ready = true; + waiter.ready = ready.intersection(waiter.interest); wakers.push(waker); } } @@ -389,7 +390,7 @@ cfg_io_readiness! { waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - is_ready: false, + ready: Ready::EMPTY, interest, _p: PhantomPinned, }), @@ -490,7 +491,7 @@ cfg_io_readiness! { // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if w.is_ready { + if !w.ready.is_empty() { // Our waker has been notified. *state = State::Done; } else { @@ -517,7 +518,7 @@ cfg_io_readiness! { return Poll::Ready(ReadyEvent { tick, - ready: Ready::from_interest(w.interest), + ready: w.ready, }); } } From 4577ce6e9fab187683a213dfc41092ed5980c965 Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Wed, 23 Nov 2022 23:39:20 +0100 Subject: [PATCH 2/7] check new readiness at future return --- tokio/src/runtime/io/scheduled_io.rs | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index dd60ea0af77..f9d267379c8 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -511,14 +511,22 @@ cfg_io_readiness! { drop(waiters); } State::Done => { - let tick = TICK.unpack(scheduled_io.readiness.load(Acquire)) as u8; - // Safety: State::Done means it is no longer shared let w = unsafe { &mut *waiter.get() }; + // Note: the returned tick might be newer then the event + // which notified our waker. This is ok because the future + // still didn't return `Poll::Ready`. + let curr = scheduled_io.readiness.load(Acquire); + let tick = TICK.unpack(curr) as u8; + + // Add more readiness which might have appeared in the meantime. + let curr_ready = Ready::from_usize(READINESS.unpack(curr)); + let ready = w.ready | (curr_ready.intersection(w.interest)); + return Poll::Ready(ReadyEvent { tick, - ready: w.ready, + ready, }); } } From 31185efe67d2c142b50fde52908b786261f4c662 Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Thu, 24 Nov 2022 14:39:52 +0100 Subject: [PATCH 3/7] add tests for `read_closed` and `write_closed` --- tokio/tests/tcp_stream.rs | 49 +++++++++++++++++++++++++++++++++++---- 1 file changed, 45 insertions(+), 4 deletions(-) diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs index 453023fc51d..31fe3baa296 100644 --- a/tokio/tests/tcp_stream.rs +++ b/tokio/tests/tcp_stream.rs @@ -254,30 +254,34 @@ async fn create_pair() -> (TcpStream, TcpStream) { (client, server) } -fn read_until_pending(stream: &mut TcpStream) { +fn read_until_pending(stream: &mut TcpStream) -> usize { let mut buf = vec![0u8; 1024 * 1024]; + let mut total = 0; loop { match stream.try_read(&mut buf) { - Ok(_) => (), + Ok(n) => total += n, Err(err) => { assert_eq!(err.kind(), io::ErrorKind::WouldBlock); break; } } } + total } -fn write_until_pending(stream: &mut TcpStream) { +fn write_until_pending(stream: &mut TcpStream) -> usize { let buf = vec![0u8; 1024 * 1024]; + let mut total = 0; loop { match stream.try_write(&buf) { - Ok(_) => (), + Ok(n) => total += n, Err(err) => { assert_eq!(err.kind(), io::ErrorKind::WouldBlock); break; } } } + total } #[tokio::test] @@ -357,3 +361,40 @@ async fn try_read_buf() { } } } + +// read_closed is a best effort event, so test only for no false positives. +#[tokio::test] +async fn read_closed() { + let (client, mut server) = create_pair().await; + + let mut ready_fut = task::spawn(client.ready(Interest::READABLE)); + assert_pending!(ready_fut.poll()); + + assert_ok!(server.write_all(b"ping").await); + + let ready_event = assert_ok!(ready_fut.await); + + assert!(!ready_event.is_read_closed()); +} + +// write_closed is a best effort event, so test only for no false positives. +#[tokio::test] +async fn write_closed() { + let (mut client, mut server) = create_pair().await; + + // Fill the write buffer. + let write_size = write_until_pending(&mut client); + let mut ready_fut = task::spawn(client.ready(Interest::WRITABLE)); + assert_pending!(ready_fut.poll()); + + // Drain the socket to make client writable. + let mut read_size = 0; + while read_size < write_size { + server.readable().await.unwrap(); + read_size += read_until_pending(&mut server); + } + + let ready_event = assert_ok!(ready_fut.await); + + assert!(!ready_event.is_write_closed()); +} From 1060ad0fd5494b5c8bd393f0a8845057568aa258 Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Thu, 24 Nov 2022 14:52:31 +0100 Subject: [PATCH 4/7] add pub(crate) back --- tokio/src/io/ready.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/io/ready.rs b/tokio/src/io/ready.rs index 2b7252f1f34..ef135c43554 100644 --- a/tokio/src/io/ready.rs +++ b/tokio/src/io/ready.rs @@ -178,7 +178,7 @@ cfg_io_readiness! { use crate::io::Interest; impl Ready { - fn from_interest(interest: Interest) -> Ready { + pub(crate) fn from_interest(interest: Interest) -> Ready { let mut ready = Ready::EMPTY; if interest.is_readable() { From 3b2bd50bc5ed3702da66113fe37ab828411d7cd7 Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Thu, 24 Nov 2022 15:19:24 +0100 Subject: [PATCH 5/7] typo --- tokio/src/runtime/io/scheduled_io.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index f9d267379c8..eca35cfce1c 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -514,7 +514,7 @@ cfg_io_readiness! { // Safety: State::Done means it is no longer shared let w = unsafe { &mut *waiter.get() }; - // Note: the returned tick might be newer then the event + // Note: the returned tick might be newer than the event // which notified our waker. This is ok because the future // still didn't return `Poll::Ready`. let curr = scheduled_io.readiness.load(Acquire); From 50d109ac63098864f2daa4cae9ee59230c473794 Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Tue, 29 Nov 2022 19:33:24 +0100 Subject: [PATCH 6/7] return current readiness --- tokio/src/runtime/io/scheduled_io.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tokio/src/runtime/io/scheduled_io.rs b/tokio/src/runtime/io/scheduled_io.rs index eca35cfce1c..1709091032b 100644 --- a/tokio/src/runtime/io/scheduled_io.rs +++ b/tokio/src/runtime/io/scheduled_io.rs @@ -62,8 +62,7 @@ cfg_io_readiness! { /// The interest this waiter is waiting on. interest: Interest, - /// Awaited readiness. - ready: Ready, + is_ready: bool, /// Should never be `!Unpin`. _p: PhantomPinned, @@ -254,7 +253,7 @@ impl ScheduledIo { let waiter = unsafe { &mut *waiter.as_ptr() }; if let Some(waker) = waiter.waker.take() { - waiter.ready = ready.intersection(waiter.interest); + waiter.is_ready = true; wakers.push(waker); } } @@ -390,7 +389,7 @@ cfg_io_readiness! { waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - ready: Ready::EMPTY, + is_ready: false, interest, _p: PhantomPinned, }), @@ -491,7 +490,7 @@ cfg_io_readiness! { // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if !w.ready.is_empty() { + if w.is_ready { // Our waker has been notified. *state = State::Done; } else { @@ -514,15 +513,17 @@ cfg_io_readiness! { // Safety: State::Done means it is no longer shared let w = unsafe { &mut *waiter.get() }; - // Note: the returned tick might be newer than the event + let curr = scheduled_io.readiness.load(Acquire); + + // The returned tick might be newer than the event // which notified our waker. This is ok because the future // still didn't return `Poll::Ready`. - let curr = scheduled_io.readiness.load(Acquire); let tick = TICK.unpack(curr) as u8; - // Add more readiness which might have appeared in the meantime. + // The readiness state could have been cleared in the meantime, + // but we allow the returned ready set to be empty. let curr_ready = Ready::from_usize(READINESS.unpack(curr)); - let ready = w.ready | (curr_ready.intersection(w.interest)); + let ready = curr_ready.intersection(w.interest); return Poll::Ready(ReadyEvent { tick, From e0912998d75ffed48b397fae3677546bfc5f5605 Mon Sep 17 00:00:00 2001 From: Tymoteusz Date: Wed, 30 Nov 2022 12:45:59 +0100 Subject: [PATCH 7/7] add a note about empty ready sets in the docs --- tokio/src/net/tcp/split.rs | 12 ++++++++++++ tokio/src/net/tcp/split_owned.rs | 12 ++++++++++++ tokio/src/net/tcp/stream.rs | 6 ++++++ tokio/src/net/udp.rs | 4 +++- tokio/src/net/unix/datagram/socket.rs | 4 +++- tokio/src/net/unix/split.rs | 6 ++++++ tokio/src/net/unix/split_owned.rs | 12 ++++++++++++ tokio/src/net/unix/stream.rs | 6 ++++++ tokio/src/net/windows/named_pipe.rs | 12 ++++++++++++ 9 files changed, 72 insertions(+), 2 deletions(-) diff --git a/tokio/src/net/tcp/split.rs b/tokio/src/net/tcp/split.rs index 2ea08b3dc32..343d4fde7d5 100644 --- a/tokio/src/net/tcp/split.rs +++ b/tokio/src/net/tcp/split.rs @@ -145,6 +145,12 @@ impl ReadHalf<'_> { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// This function is equivalent to [`TcpStream::ready`]. /// /// # Cancel safety @@ -273,6 +279,12 @@ impl WriteHalf<'_> { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// This function is equivalent to [`TcpStream::ready`]. /// /// # Cancel safety diff --git a/tokio/src/net/tcp/split_owned.rs b/tokio/src/net/tcp/split_owned.rs index e2cfdefe1a3..b2730e8fb0f 100644 --- a/tokio/src/net/tcp/split_owned.rs +++ b/tokio/src/net/tcp/split_owned.rs @@ -200,6 +200,12 @@ impl OwnedReadHalf { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// This function is equivalent to [`TcpStream::ready`]. /// /// # Cancel safety @@ -355,6 +361,12 @@ impl OwnedWriteHalf { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// This function is equivalent to [`TcpStream::ready`]. /// /// # Cancel safety diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 0b8529546c6..b7dd3377b75 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -377,6 +377,12 @@ impl TcpStream { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method diff --git a/tokio/src/net/udp.rs b/tokio/src/net/udp.rs index 922a977a929..af343f20090 100644 --- a/tokio/src/net/udp.rs +++ b/tokio/src/net/udp.rs @@ -357,7 +357,9 @@ impl UdpSocket { /// /// The function may complete without the socket being ready. This is a /// false-positive and attempting an operation will return with - /// `io::ErrorKind::WouldBlock`. + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. /// /// # Cancel safety /// diff --git a/tokio/src/net/unix/datagram/socket.rs b/tokio/src/net/unix/datagram/socket.rs index 0f5dca421cf..5e1453e380d 100644 --- a/tokio/src/net/unix/datagram/socket.rs +++ b/tokio/src/net/unix/datagram/socket.rs @@ -104,7 +104,9 @@ impl UnixDatagram { /// /// The function may complete without the socket being ready. This is a /// false-positive and attempting an operation will return with - /// `io::ErrorKind::WouldBlock`. + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. /// /// # Cancel safety /// diff --git a/tokio/src/net/unix/split.rs b/tokio/src/net/unix/split.rs index f9664d53254..816a2578b5f 100644 --- a/tokio/src/net/unix/split.rs +++ b/tokio/src/net/unix/split.rs @@ -182,6 +182,12 @@ impl WriteHalf<'_> { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method diff --git a/tokio/src/net/unix/split_owned.rs b/tokio/src/net/unix/split_owned.rs index 3dce2a86aa8..da41ced83c2 100644 --- a/tokio/src/net/unix/split_owned.rs +++ b/tokio/src/net/unix/split_owned.rs @@ -114,6 +114,12 @@ impl OwnedReadHalf { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method @@ -265,6 +271,12 @@ impl OwnedWriteHalf { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method diff --git a/tokio/src/net/unix/stream.rs b/tokio/src/net/unix/stream.rs index e9acbb68240..2d278986c97 100644 --- a/tokio/src/net/unix/stream.rs +++ b/tokio/src/net/unix/stream.rs @@ -66,6 +66,12 @@ impl UnixStream { /// can be used to concurrently read / write to the same socket on a single /// task without splitting the socket. /// + /// The function may complete without the socket being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Cancel safety /// /// This method is cancel safe. Once a readiness event occurs, the method diff --git a/tokio/src/net/windows/named_pipe.rs b/tokio/src/net/windows/named_pipe.rs index 3f7a5a4fe9d..692c69ded46 100644 --- a/tokio/src/net/windows/named_pipe.rs +++ b/tokio/src/net/windows/named_pipe.rs @@ -238,6 +238,12 @@ impl NamedPipeServer { /// can be used to concurrently read / write to the same pipe on a single /// task without splitting the pipe. /// + /// The function may complete without the pipe being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Examples /// /// Concurrently read and write to the pipe on the same task without @@ -989,6 +995,12 @@ impl NamedPipeClient { /// can be used to concurrently read / write to the same pipe on a single /// task without splitting the pipe. /// + /// The function may complete without the pipe being ready. This is a + /// false-positive and attempting an operation will return with + /// `io::ErrorKind::WouldBlock`. The function can also return with an empty + /// [`Ready`] set, so you should always check the returned value and possibly + /// wait again if the requested states are not set. + /// /// # Examples /// /// Concurrently read and write to the pipe on the same task without