Skip to content

Commit

Permalink
make tokio::io::empty cooperative
Browse files Browse the repository at this point in the history
Reads and buffered reads from a `tokio::io::empty` were always marked
as ready. That makes sense, given that there is nothing to wait for.
However, doing repeated reads on the `empty` could stall the event
loop and prevent other tasks from making progress.

This change makes reads on empty objects cooperative. One of every two
reads will return `Poll::Pending`, which will give the executor a
chance to keep making progress elsewhere.

Fixes: tokio-rs#4291
  • Loading branch information
BraulioVM committed Dec 5, 2021
1 parent ee4b2ed commit eb7a0a2
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 7 deletions.
33 changes: 26 additions & 7 deletions tokio/src/io/util/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ cfg_io_util! {
/// [`empty`]: fn@empty
/// [std]: std::io::empty
pub struct Empty {
_p: (),
will_yield: bool
}

/// Creates a new empty async reader.
Expand All @@ -42,25 +42,44 @@ cfg_io_util! {
/// }
/// ```
pub fn empty() -> Empty {
Empty { _p: () }
Empty { will_yield: true }
}
}

impl AsyncRead for Empty {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
mut self: Pin<&mut Self>,
context: &mut Context<'_>,
_: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
let result = if self.will_yield {
context.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(Ok(()))
};

self.will_yield = !self.will_yield;

result
}
}

impl AsyncBufRead for Empty {
#[inline]
fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
Poll::Ready(Ok(&[]))
fn poll_fill_buf(
mut self: Pin<&mut Self>,
context: &mut Context<'_>,
) -> Poll<io::Result<&[u8]>> {
let result: Poll<io::Result<&[u8]>> = if self.will_yield {
context.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(Ok(&[]))
};
self.will_yield = !self.will_yield;
result
}

#[inline]
Expand Down
57 changes: 57 additions & 0 deletions tokio/tests/io_util_empty.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::time::Duration;
use tokio::io::{empty, AsyncBufReadExt, AsyncReadExt};
use tokio::time::timeout;

#[tokio::test]
async fn empty_read_is_cooperative() {
// the test would likely hang if reads on `empty` weren't
// cooperative
let _ = timeout(Duration::from_millis(1000), async {
loop {
let mut buf = [0u8; 4096];
let _ = tokio::io::empty().read(&mut buf).await;
}
})
.await;
}

#[tokio::test]
async fn repeated_reads_on_same_empty_are_cooperative() {
// the test would likely hang if reads on `empty` weren't
// cooperative
let _ = timeout(Duration::from_millis(1000), async {
let mut empty = tokio::io::empty();
loop {
let mut buf = [0u8; 4096];
let _ = empty.read(&mut buf).await;
}
})
.await;
}

#[tokio::test]
async fn empty_buf_reads_are_cooperative() {
// the test would likely hang if reads on `empty` weren't
// cooperative
let _ = timeout(Duration::from_millis(1000), async {
loop {
let mut buf = String::new();
let _ = tokio::io::empty().read_line(&mut buf).await;
}
})
.await;
}

#[tokio::test]
async fn repeated_empty_buf_reads_are_cooperative() {
// the test would likely hang if reads on `empty` weren't
// cooperative
let _ = timeout(Duration::from_millis(1000), async {
let mut empty = tokio::io::empty();
loop {
let mut buf = String::new();
let _ = empty.read_line(&mut buf).await;
}
})
.await;
}

0 comments on commit eb7a0a2

Please sign in to comment.