From 6898b7009888c8df1160e894b82af000b7a6b524 Mon Sep 17 00:00:00 2001 From: masnagam Date: Fri, 7 Aug 2020 15:21:07 +0900 Subject: [PATCH 1/5] net: restore TcpStream::{poll_read_ready, poll_write_ready} --- tokio/src/net/tcp/stream.rs | 81 +++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 2ac37a2bc85..2b921e5f896 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -356,6 +356,49 @@ impl TcpStream { Ok(()) } + /// Check the TCP stream's read readiness state. + /// + /// The mask argument allows specifying what readiness to notify on. This + /// can be any value, including platform specific readiness, **except** + /// `writable`. HUP is always implicitly included on platforms that support + /// it. + /// + /// If the stream is not ready for a read then `Poll::Pending` is returned + /// and the current task is notified once a new event is received. + /// + /// The stream will remain in a read-ready state until calls to + /// `poll_read` return `Poll::Pending`. + /// + /// # Panics + /// + /// This function panics if: + /// + /// * `ready` includes writable. + /// * called from outside of a task context. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::io; + /// use tokio::net::TcpStream; + /// + /// use futures::future::poll_fn; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let stream = TcpStream::connect("127.0.0.:8080").await?; + /// + /// poll_fn(|cx| { + /// stream.poll_read_ready(cx) + /// }).await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.registration().poll_read_ready(cx).map_ok(|_| ()) + } + /// Try to read data from the stream into the provided buffer, returning how /// many bytes were read. /// @@ -467,6 +510,44 @@ impl TcpStream { Ok(()) } + /// Check the TCP stream's write readiness state. + /// + /// This always checks for writable readiness and also checks for HUP + /// readiness on platforms that support it. + /// + /// If the stream is not ready for a write then `Poll::Pending` is returned + /// and the current task is notified once a new event is received. + /// + /// The stream will remain in a write-ready state until calls to + /// `poll_write` return `Poll::Pending`. + /// + /// # Panics + /// + /// This function panics if called from outside of a task context. + /// + /// # Examples + /// + /// ```no_run + /// use tokio::io; + /// use tokio::net::TcpStream; + /// + /// use futures::future::poll_fn; + /// + /// #[tokio::main] + /// async fn main() -> io::Result<()> { + /// let stream = TcpStream::connect("127.0.0.:8080").await?; + /// + /// poll_fn(|cx| { + /// stream.poll_write_ready(cx) + /// }).await?; + /// + /// Ok(()) + /// } + /// ``` + pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + self.io.registration().poll_write_ready(cx).map_ok(|_| ()) + } + /// Try to write a buffer to the stream, returning how many bytes were /// written. /// From 9d5227790a97549d7e303b2289230afa919b95f8 Mon Sep 17 00:00:00 2001 From: masnagam Date: Fri, 7 Aug 2020 15:24:38 +0900 Subject: [PATCH 2/5] test: add tcp_ready.rs --- tokio/tests/tcp_ready.rs | 133 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 tokio/tests/tcp_ready.rs diff --git a/tokio/tests/tcp_ready.rs b/tokio/tests/tcp_ready.rs new file mode 100644 index 00000000000..9959bad09b6 --- /dev/null +++ b/tokio/tests/tcp_ready.rs @@ -0,0 +1,133 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::pin::Pin; +use std::task::Poll; + +use futures::future::poll_fn; +use mio::Ready; +use tokio::net::{TcpListener, TcpStream}; +use tokio::prelude::*; +use tokio::try_join; + +use tokio_test::assert_ok; + +macro_rules! assert_readable { + ($stream:expr) => { + let ready = assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx, Ready::readable())).await); + assert!(ready.is_readable()); + }; +} + +macro_rules! assert_not_readable { + ($stream:expr) => { + poll_fn(|cx| { + assert!($stream.poll_read_ready(cx, Ready::readable()).is_pending()); + Poll::Ready(()) + }) + .await; + }; +} + +macro_rules! assert_writable { + ($stream:expr) => { + let ready = assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await); + assert!(ready.is_writable()); + }; +} + +macro_rules! assert_not_writable { + ($stream:expr) => { + poll_fn(|cx| { + assert!($stream.poll_write_ready(cx).is_pending()); + Poll::Ready(()) + }) + .await; + }; +} + +async fn create_pair() -> (TcpStream, TcpStream) { + let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + let (client, (server, _)) = + assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept(),)); + (client, server) +} + +async fn read_until_pending(mut stream: &mut TcpStream) { + poll_fn(|cx| { + let mut buf = vec![0; 1024 * 1024]; + loop { + match Pin::new(&mut stream).poll_read(cx, &mut buf) { + Poll::Pending => break, + Poll::Ready(res) => assert!(res.is_ok()), + } + } + Poll::Ready(()) + }) + .await; +} + +async fn write_until_pending(mut stream: &mut TcpStream) { + poll_fn(|cx| { + let buf = vec![0; 1024 * 1024]; + loop { + match Pin::new(&mut stream).poll_write(cx, &buf) { + Poll::Pending => break, + Poll::Ready(res) => assert!(res.is_ok()), + } + } + Poll::Ready(()) + }) + .await; +} + +#[tokio::test] +async fn tcp_stream_poll_read_ready() { + let (mut client, mut server) = create_pair().await; + + // Initial state - not readable. + assert_not_readable!(server); + + // There is data in the buffer - readable. + assert_ok!(client.write_all(b"ping").await); + assert_readable!(server); + + // Readable until calls to `poll_read` return `Poll::Pending`. + let mut buf = [0; 4]; + assert_ok!(server.read_exact(&mut buf).await); + assert_readable!(server); + read_until_pending(&mut server).await; + assert_not_readable!(server); + + // Detect the client disconnect. + drop(client); + assert_readable!(server); +} + +#[tokio::test] +async fn tcp_stream_poll_write_ready() { + let (mut client, mut server) = create_pair().await; + + // Initial state - writable. + assert_writable!(client); + + // No space to write - not writable. + write_until_pending(&mut client).await; + assert_not_writable!(client); + + assert_ok!(client.flush().await); // just to be sure + + // Not writable until calls to `poll_write` return `Poll::Pending`. + read_until_pending(&mut server).await; + assert_not_writable!(client); + assert_ok!(client.write_all(b"ping").await); + assert_writable!(client); + + write_until_pending(&mut client).await; + assert_not_writable!(client); + + // Detect the server disconnect. + drop(server); + assert_writable!(client); +} From 4d2fa149f078639c30e19e5057a5cbbca0ccb623 Mon Sep 17 00:00:00 2001 From: masnagam Date: Sat, 8 Aug 2020 13:16:05 +0900 Subject: [PATCH 3/5] test: fix timeout issues on `ubuntu-latest` --- tokio/tests/tcp_ready.rs | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/tokio/tests/tcp_ready.rs b/tokio/tests/tcp_ready.rs index 9959bad09b6..f47c13c2ff4 100644 --- a/tokio/tests/tcp_ready.rs +++ b/tokio/tests/tcp_ready.rs @@ -49,8 +49,7 @@ macro_rules! assert_not_writable { async fn create_pair() -> (TcpStream, TcpStream) { let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); let addr = assert_ok!(listener.local_addr()); - let (client, (server, _)) = - assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept(),)); + let (client, (server, _)) = assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept())); (client, server) } @@ -107,7 +106,7 @@ async fn tcp_stream_poll_read_ready() { #[tokio::test] async fn tcp_stream_poll_write_ready() { - let (mut client, mut server) = create_pair().await; + let (mut client, server) = create_pair().await; // Initial state - writable. assert_writable!(client); @@ -116,17 +115,6 @@ async fn tcp_stream_poll_write_ready() { write_until_pending(&mut client).await; assert_not_writable!(client); - assert_ok!(client.flush().await); // just to be sure - - // Not writable until calls to `poll_write` return `Poll::Pending`. - read_until_pending(&mut server).await; - assert_not_writable!(client); - assert_ok!(client.write_all(b"ping").await); - assert_writable!(client); - - write_until_pending(&mut client).await; - assert_not_writable!(client); - // Detect the server disconnect. drop(server); assert_writable!(client); From aec175e32f30a02d68b4cfd434f58cbfbb447466 Mon Sep 17 00:00:00 2001 From: masnagam Date: Mon, 16 Nov 2020 22:35:16 +0900 Subject: [PATCH 4/5] test: update tests --- tokio/tests/tcp_ready.rs | 121 -------------------------------------- tokio/tests/tcp_stream.rs | 112 ++++++++++++++++++++++++++++++++++- 2 files changed, 110 insertions(+), 123 deletions(-) delete mode 100644 tokio/tests/tcp_ready.rs diff --git a/tokio/tests/tcp_ready.rs b/tokio/tests/tcp_ready.rs deleted file mode 100644 index f47c13c2ff4..00000000000 --- a/tokio/tests/tcp_ready.rs +++ /dev/null @@ -1,121 +0,0 @@ -#![warn(rust_2018_idioms)] -#![cfg(feature = "full")] - -use std::pin::Pin; -use std::task::Poll; - -use futures::future::poll_fn; -use mio::Ready; -use tokio::net::{TcpListener, TcpStream}; -use tokio::prelude::*; -use tokio::try_join; - -use tokio_test::assert_ok; - -macro_rules! assert_readable { - ($stream:expr) => { - let ready = assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx, Ready::readable())).await); - assert!(ready.is_readable()); - }; -} - -macro_rules! assert_not_readable { - ($stream:expr) => { - poll_fn(|cx| { - assert!($stream.poll_read_ready(cx, Ready::readable()).is_pending()); - Poll::Ready(()) - }) - .await; - }; -} - -macro_rules! assert_writable { - ($stream:expr) => { - let ready = assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await); - assert!(ready.is_writable()); - }; -} - -macro_rules! assert_not_writable { - ($stream:expr) => { - poll_fn(|cx| { - assert!($stream.poll_write_ready(cx).is_pending()); - Poll::Ready(()) - }) - .await; - }; -} - -async fn create_pair() -> (TcpStream, TcpStream) { - let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); - let addr = assert_ok!(listener.local_addr()); - let (client, (server, _)) = assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept())); - (client, server) -} - -async fn read_until_pending(mut stream: &mut TcpStream) { - poll_fn(|cx| { - let mut buf = vec![0; 1024 * 1024]; - loop { - match Pin::new(&mut stream).poll_read(cx, &mut buf) { - Poll::Pending => break, - Poll::Ready(res) => assert!(res.is_ok()), - } - } - Poll::Ready(()) - }) - .await; -} - -async fn write_until_pending(mut stream: &mut TcpStream) { - poll_fn(|cx| { - let buf = vec![0; 1024 * 1024]; - loop { - match Pin::new(&mut stream).poll_write(cx, &buf) { - Poll::Pending => break, - Poll::Ready(res) => assert!(res.is_ok()), - } - } - Poll::Ready(()) - }) - .await; -} - -#[tokio::test] -async fn tcp_stream_poll_read_ready() { - let (mut client, mut server) = create_pair().await; - - // Initial state - not readable. - assert_not_readable!(server); - - // There is data in the buffer - readable. - assert_ok!(client.write_all(b"ping").await); - assert_readable!(server); - - // Readable until calls to `poll_read` return `Poll::Pending`. - let mut buf = [0; 4]; - assert_ok!(server.read_exact(&mut buf).await); - assert_readable!(server); - read_until_pending(&mut server).await; - assert_not_readable!(server); - - // Detect the client disconnect. - drop(client); - assert_readable!(server); -} - -#[tokio::test] -async fn tcp_stream_poll_write_ready() { - let (mut client, server) = create_pair().await; - - // Initial state - writable. - assert_writable!(client); - - // No space to write - not writable. - write_until_pending(&mut client).await; - assert_not_writable!(client); - - // Detect the server disconnect. - drop(server); - assert_writable!(client); -} diff --git a/tokio/tests/tcp_stream.rs b/tokio/tests/tcp_stream.rs index 784ade8a411..84d58dc511b 100644 --- a/tokio/tests/tcp_stream.rs +++ b/tokio/tests/tcp_stream.rs @@ -1,12 +1,16 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::io::Interest; +use tokio::io::{AsyncReadExt, AsyncWriteExt, Interest}; use tokio::net::{TcpListener, TcpStream}; +use tokio::try_join; use tokio_test::task; -use tokio_test::{assert_pending, assert_ready_ok}; +use tokio_test::{assert_ok, assert_pending, assert_ready_ok}; use std::io; +use std::task::Poll; + +use futures::future::poll_fn; #[tokio::test] async fn try_read_write() { @@ -110,3 +114,107 @@ fn buffer_not_included_in_future() { let n = mem::size_of_val(&fut); assert!(n < 1000); } + +macro_rules! assert_readable_by_polling { + ($stream:expr) => { + assert_ok!(poll_fn(|cx| $stream.poll_read_ready(cx)).await); + }; +} + +macro_rules! assert_not_readable_by_polling { + ($stream:expr) => { + poll_fn(|cx| { + assert_pending!($stream.poll_read_ready(cx)); + Poll::Ready(()) + }) + .await; + }; +} + +macro_rules! assert_writable_by_polling { + ($stream:expr) => { + assert_ok!(poll_fn(|cx| $stream.poll_write_ready(cx)).await); + }; +} + +macro_rules! assert_not_writable_by_polling { + ($stream:expr) => { + poll_fn(|cx| { + assert_pending!($stream.poll_write_ready(cx)); + Poll::Ready(()) + }) + .await; + }; +} + +#[tokio::test] +async fn poll_read_ready() { + let (mut client, mut server) = create_pair().await; + + // Initial state - not readable. + assert_not_readable_by_polling!(server); + + // There is data in the buffer - readable. + assert_ok!(client.write_all(b"ping").await); + assert_readable_by_polling!(server); + + // Readable until calls to `poll_read` return `Poll::Pending`. + let mut buf = [0u8; 4]; + assert_ok!(server.read_exact(&mut buf).await); + assert_readable_by_polling!(server); + read_until_pending(&mut server); + assert_not_readable_by_polling!(server); + + // Detect the client disconnect. + drop(client); + assert_readable_by_polling!(server); +} + +#[tokio::test] +async fn poll_write_ready() { + let (mut client, server) = create_pair().await; + + // Initial state - writable. + assert_writable_by_polling!(client); + + // No space to write - not writable. + write_until_pending(&mut client); + assert_not_writable_by_polling!(client); + + // Detect the server disconnect. + drop(server); + assert_writable_by_polling!(client); +} + +async fn create_pair() -> (TcpStream, TcpStream) { + let listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await); + let addr = assert_ok!(listener.local_addr()); + let (client, (server, _)) = assert_ok!(try_join!(TcpStream::connect(&addr), listener.accept())); + (client, server) +} + +fn read_until_pending(stream: &mut TcpStream) { + let mut buf = vec![0u8; 1024 * 1024]; + loop { + match stream.try_read(&mut buf) { + Ok(_) => (), + Err(err) => { + assert_eq!(err.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } +} + +fn write_until_pending(stream: &mut TcpStream) { + let buf = vec![0u8; 1024 * 1024]; + loop { + match stream.try_write(&buf) { + Ok(_) => (), + Err(err) => { + assert_eq!(err.kind(), io::ErrorKind::WouldBlock); + break; + } + } + } +} From 17955a3afab66075dcb37c8f0df2426b2c56e76e Mon Sep 17 00:00:00 2001 From: masnagam Date: Mon, 16 Nov 2020 22:36:14 +0900 Subject: [PATCH 5/5] docs: update documents --- tokio/src/net/tcp/stream.rs | 79 +++++-------------------------------- 1 file changed, 10 insertions(+), 69 deletions(-) diff --git a/tokio/src/net/tcp/stream.rs b/tokio/src/net/tcp/stream.rs index 2b921e5f896..8a157e1c29d 100644 --- a/tokio/src/net/tcp/stream.rs +++ b/tokio/src/net/tcp/stream.rs @@ -356,45 +356,13 @@ impl TcpStream { Ok(()) } - /// Check the TCP stream's read readiness state. + /// Polls for read readiness. /// - /// The mask argument allows specifying what readiness to notify on. This - /// can be any value, including platform specific readiness, **except** - /// `writable`. HUP is always implicitly included on platforms that support - /// it. + /// This function is intended for cases where creating and pinning a future + /// via [`readable`] is not feasible. Where possible, using [`readable`] is + /// preferred, as this supports polling from multiple tasks at once. /// - /// If the stream is not ready for a read then `Poll::Pending` is returned - /// and the current task is notified once a new event is received. - /// - /// The stream will remain in a read-ready state until calls to - /// `poll_read` return `Poll::Pending`. - /// - /// # Panics - /// - /// This function panics if: - /// - /// * `ready` includes writable. - /// * called from outside of a task context. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::io; - /// use tokio::net::TcpStream; - /// - /// use futures::future::poll_fn; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let stream = TcpStream::connect("127.0.0.:8080").await?; - /// - /// poll_fn(|cx| { - /// stream.poll_read_ready(cx) - /// }).await?; - /// - /// Ok(()) - /// } - /// ``` + /// [`readable`]: method@Self::readable pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { self.io.registration().poll_read_ready(cx).map_ok(|_| ()) } @@ -510,40 +478,13 @@ impl TcpStream { Ok(()) } - /// Check the TCP stream's write readiness state. + /// Polls for write readiness. /// - /// This always checks for writable readiness and also checks for HUP - /// readiness on platforms that support it. + /// This function is intended for cases where creating and pinning a future + /// via [`writable`] is not feasible. Where possible, using [`writable`] is + /// preferred, as this supports polling from multiple tasks at once. /// - /// If the stream is not ready for a write then `Poll::Pending` is returned - /// and the current task is notified once a new event is received. - /// - /// The stream will remain in a write-ready state until calls to - /// `poll_write` return `Poll::Pending`. - /// - /// # Panics - /// - /// This function panics if called from outside of a task context. - /// - /// # Examples - /// - /// ```no_run - /// use tokio::io; - /// use tokio::net::TcpStream; - /// - /// use futures::future::poll_fn; - /// - /// #[tokio::main] - /// async fn main() -> io::Result<()> { - /// let stream = TcpStream::connect("127.0.0.:8080").await?; - /// - /// poll_fn(|cx| { - /// stream.poll_write_ready(cx) - /// }).await?; - /// - /// Ok(()) - /// } - /// ``` + /// [`writable`]: method@Self::writable pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { self.io.registration().poll_write_ready(cx).map_ok(|_| ()) }