Skip to content

Commit

Permalink
io: optimize chances of large write in CopyBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
Armillus committed May 5, 2024
1 parent fa05e0c commit 676fa45
Showing 1 changed file with 34 additions and 48 deletions.
82 changes: 34 additions & 48 deletions tokio/src/io/util/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::task::{Context, Poll};

#[derive(Debug)]
pub(super) struct CopyBuffer {
flush_in_progress: bool,
read_done: bool,
need_flush: bool,
pos: usize,
Expand All @@ -19,7 +18,6 @@ pub(super) struct CopyBuffer {
impl CopyBuffer {
pub(super) fn new(buf_size: usize) -> Self {
Self {
flush_in_progress: false,
read_done: false,
need_flush: false,
pos: 0,
Expand Down Expand Up @@ -98,20 +96,9 @@ impl CopyBuffer {
// Keep track of task budget
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
loop {
// Keep track of the read future status
let mut is_read_pending = false;

let is_buffer_empty = self.pos == self.cap;
let is_buffer_not_full = self.cap < self.buf.len();

// If our buffer is empty or not full yet, then we try to read some
// data to continue.
if (is_buffer_empty || is_buffer_not_full) && !self.read_done {
if is_buffer_empty {
self.pos = 0;
self.cap = 0;
}

// If there is some space left in our buffer, then we try to read some
// data to continue, thus maximizing the chances of a large write.
if self.cap < self.buf.len() && !self.read_done {
match self.poll_fill_buf(cx, reader.as_mut()) {
Poll::Ready(Ok(())) => {
#[cfg(any(
Expand Down Expand Up @@ -140,39 +127,30 @@ impl CopyBuffer {
coop.made_progress();
return Poll::Ready(Err(err));
}
Poll::Pending => {
// Rather than returning now, try to flush when the reader has no
// progress to avoid deadlock when the reader depends on buffered writer.
is_read_pending = true;
}
}
}
// Ignore pending reads when our buffer is not empty, because
// we can try to write data immediately.
Poll::Pending => if self.pos == self.cap {
// Try flushing when the reader has no progress to avoid deadlock
// when the reader depends on buffered writer.
if self.need_flush {
ready!(writer.as_mut().poll_flush(cx))?;
#[cfg(any(
feature = "fs",
feature = "io-std",
feature = "net",
feature = "process",
feature = "rt",
feature = "signal",
feature = "sync",
feature = "time",
))]
coop.made_progress();
self.need_flush = false;
}

// Try flushing before writing, either because the reader has no progress or to
// be sure that this same flush operation is terminated properly.
if is_read_pending || self.flush_in_progress {
if is_read_pending {
self.flush_in_progress = true;
return Poll::Pending;
}
}

ready!(writer.as_mut().poll_flush(cx))?;
#[cfg(any(
feature = "fs",
feature = "io-std",
feature = "net",
feature = "process",
feature = "rt",
feature = "signal",
feature = "sync",
feature = "time",
))]
coop.made_progress();
self.need_flush = false;
self.flush_in_progress = false;
}

if is_read_pending {
return Poll::Pending;
}

// If our buffer has some data, let's write it out!
Expand Down Expand Up @@ -209,9 +187,17 @@ impl CopyBuffer {
"writer returned length larger than input slice"
);

if self.pos != self.cap {
continue;
}

// All data has been written, the buffer can be considered empty again
self.pos = 0;
self.cap = 0;

// If we've written all the data and we've seen EOF, flush out the
// data and finish the transfer.
if self.pos == self.cap && self.read_done {
if self.read_done {
ready!(writer.as_mut().poll_flush(cx))?;
#[cfg(any(
feature = "fs",
Expand Down

0 comments on commit 676fa45

Please sign in to comment.