From 0b9acc609445d69aeac638218acb045d15305b91 Mon Sep 17 00:00:00 2001 From: Xinyi Gong Date: Sun, 6 Feb 2022 17:39:28 -0800 Subject: [PATCH 1/4] io: make duplex stream cooperative (#4470) Add coop checks on pipe poll_read and poll_write. Fixes: #4470 Refs: #4291, #4300 --- tokio/src/io/util/mem.rs | 16 ++++++++++++++++ tokio/tests/io_mem_stream.rs | 19 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 4eefe7b26f5..f95c83838c8 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -185,6 +185,7 @@ impl AsyncRead for Pipe { cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { + ready!(poll_proceed_and_make_progress(cx)); if self.buffer.has_remaining() { let max = self.buffer.remaining().min(buf.remaining()); buf.put_slice(&self.buffer[..max]); @@ -212,6 +213,7 @@ impl AsyncWrite for Pipe { cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { + ready!(poll_proceed_and_make_progress(cx)); if self.is_closed { return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())); } @@ -241,3 +243,17 @@ impl AsyncWrite for Pipe { Poll::Ready(Ok(())) } } + +cfg_coop! { + fn poll_proceed_and_make_progress(cx: &mut task::Context<'_>) -> Poll<()> { + let coop = ready!(crate::coop::poll_proceed(cx)); + coop.made_progress(); + Poll::Ready(()) + } +} + +cfg_not_coop! { + fn poll_proceed_and_make_progress(_: &mut Context<'_>) -> Poll<()> { + Poll::Ready(()) + } +} diff --git a/tokio/tests/io_mem_stream.rs b/tokio/tests/io_mem_stream.rs index 01baa5369c7..4b5e7b7746f 100644 --- a/tokio/tests/io_mem_stream.rs +++ b/tokio/tests/io_mem_stream.rs @@ -100,3 +100,22 @@ async fn max_write_size() { // drop b only after task t1 finishes writing drop(b); } + +#[tokio::test] +async fn duplex_is_cooperative() { + let (mut tx, mut rx) = tokio::io::duplex(1024 * 8); + + tokio::select! { + biased; + + _ = async { + loop { + let buf = [3u8; 4096]; + let _ = tx.write_all(&buf).await; + let mut buf = [0u8; 4096]; + let _ = rx.read(&mut buf).await; + } + } => {}, + _ = tokio::task::yield_now() => {} + } +} From 6a3b85d9526e031b29b3df7e856d5262c0a9a945 Mon Sep 17 00:00:00 2001 From: Xinyi Gong Date: Sun, 6 Feb 2022 17:39:28 -0800 Subject: [PATCH 2/4] io: make duplex stream cooperative (#4470) Add coop checks on pipe poll_read and poll_write. Fixes: #4470 Refs: #4291, #4300 --- tokio/src/io/util/mem.rs | 63 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 6 deletions(-) diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index f95c83838c8..08fd037bf82 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -177,10 +177,8 @@ impl Pipe { waker.wake(); } } -} -impl AsyncRead for Pipe { - fn poll_read( + fn poll_read_internal( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, @@ -205,10 +203,8 @@ impl AsyncRead for Pipe { Poll::Pending } } -} -impl AsyncWrite for Pipe { - fn poll_write( + fn poll_write_internal( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], @@ -230,6 +226,61 @@ impl AsyncWrite for Pipe { } Poll::Ready(Ok(len)) } +} + +impl AsyncRead for Pipe { + cfg_coop! { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let coop = ready!(crate::coop::poll_proceed(cx)); + + let ret = self.poll_read_internal(cx, buf); + if ret.is_ready() { + coop.made_progress(); + } + ret + } + } + + cfg_not_coop! { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.poll_read_internal(cx, buf) + } + } +} + +impl AsyncWrite for Pipe { + cfg_coop! { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll> { + let coop = ready!(crate::coop::poll_proceed(cx)); + let ret = self.poll_write_internal(cx, buf); + if ret.is_ready() { + coop.made_progress(); + } + ret + } + } + + cfg_not_coop! { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll> { + self.poll_write_internal(cx, buf) + } + } fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll> { Poll::Ready(Ok(())) From 5031a54968873ccd2fd3a11707926b9c4d8b90f3 Mon Sep 17 00:00:00 2001 From: Xinyi Gong Date: Sun, 6 Feb 2022 17:39:28 -0800 Subject: [PATCH 3/4] io: make duplex stream cooperative (#4470) Add coop checks on pipe poll_read and poll_write. Fixes: #4470 Refs: #4291, #4300 --- tokio/src/io/util/mem.rs | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 08fd037bf82..1510db34587 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -183,7 +183,6 @@ impl Pipe { cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - ready!(poll_proceed_and_make_progress(cx)); if self.buffer.has_remaining() { let max = self.buffer.remaining().min(buf.remaining()); buf.put_slice(&self.buffer[..max]); @@ -209,7 +208,6 @@ impl Pipe { cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { - ready!(poll_proceed_and_make_progress(cx)); if self.is_closed { return Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())); } @@ -264,6 +262,7 @@ impl AsyncWrite for Pipe { buf: &[u8], ) -> Poll> { let coop = ready!(crate::coop::poll_proceed(cx)); + let ret = self.poll_write_internal(cx, buf); if ret.is_ready() { coop.made_progress(); @@ -294,17 +293,3 @@ impl AsyncWrite for Pipe { Poll::Ready(Ok(())) } } - -cfg_coop! { - fn poll_proceed_and_make_progress(cx: &mut task::Context<'_>) -> Poll<()> { - let coop = ready!(crate::coop::poll_proceed(cx)); - coop.made_progress(); - Poll::Ready(()) - } -} - -cfg_not_coop! { - fn poll_proceed_and_make_progress(_: &mut Context<'_>) -> Poll<()> { - Poll::Ready(()) - } -} From 26c26c4665309a95dc454d954740227be0b54422 Mon Sep 17 00:00:00 2001 From: Xinyi Gong Date: Tue, 8 Feb 2022 14:14:50 -0800 Subject: [PATCH 4/4] io: make duplex stream cooperative (#4470) Add coop checks on pipe poll_read and poll_write. Fixes: #4470 Refs: #4291, #4300 --- tokio/src/io/util/mem.rs | 4 ++-- tokio/tests/io_mem_stream.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 1510db34587..4019db56ff4 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -245,7 +245,7 @@ impl AsyncRead for Pipe { cfg_not_coop! { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { @@ -273,7 +273,7 @@ impl AsyncWrite for Pipe { cfg_not_coop! { fn poll_write( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut task::Context<'_>, buf: &[u8], ) -> Poll> { diff --git a/tokio/tests/io_mem_stream.rs b/tokio/tests/io_mem_stream.rs index 4b5e7b7746f..a2c2dadfc90 100644 --- a/tokio/tests/io_mem_stream.rs +++ b/tokio/tests/io_mem_stream.rs @@ -111,9 +111,9 @@ async fn duplex_is_cooperative() { _ = async { loop { let buf = [3u8; 4096]; - let _ = tx.write_all(&buf).await; + tx.write_all(&buf).await.unwrap(); let mut buf = [0u8; 4096]; - let _ = rx.read(&mut buf).await; + rx.read(&mut buf).await.unwrap(); } } => {}, _ = tokio::task::yield_now() => {}