Skip to content

Commit

Permalink
make tokio::io::empty cooperative
Browse files Browse the repository at this point in the history
Reads and buffered reads from a `tokio::io::empty` were always marked
as ready. That makes sense, given that there is nothing to wait for.
However, doing repeated reads on the `empty` could stall the event
loop and prevent other tasks from making progress.

This change uses tokio's coop system to yield control back to the
executor when appropriate.

Note that the issue that originally triggered this PR is not fixed
yet, because the `timeout` function will not poll the timer after
empty::read runs out of budget. A different change will be needed to
address that.

Refs: tokio-rs#4291
  • Loading branch information
BraulioVM committed Dec 6, 2021
1 parent ee4b2ed commit e92466a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 2 deletions.
8 changes: 6 additions & 2 deletions tokio/src/io/util/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,20 @@ impl AsyncRead for Empty {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
cx: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
let coop = ready!(crate::coop::poll_proceed(cx));
coop.made_progress();
Poll::Ready(Ok(()))
}
}

impl AsyncBufRead for Empty {
#[inline]
fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
let coop = ready!(crate::coop::poll_proceed(cx));
coop.made_progress();
Poll::Ready(Ok(&[]))
}

Expand Down
31 changes: 31 additions & 0 deletions tokio/tests/io_util_empty.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use tokio::io::{AsyncBufReadExt, AsyncReadExt};

#[tokio::test]
async fn empty_read_is_cooperative() {
tokio::select! {
biased;

_ = async {
loop {
let mut buf = [0u8; 4096];
let _ = tokio::io::empty().read(&mut buf).await;
}
} => {},
_ = tokio::task::yield_now() => {}
}
}

#[tokio::test]
async fn empty_buf_reads_are_cooperative() {
tokio::select! {
biased;

_ = async {
loop {
let mut buf = String::new();
let _ = tokio::io::empty().read_line(&mut buf).await;
}
} => {},
_ = tokio::task::yield_now() => {}
}
}

0 comments on commit e92466a

Please sign in to comment.