Skip to content

Commit

Permalink
Rewrite AsyncVectoredWrite impl for BufWriter
Browse files Browse the repository at this point in the history
Fill the buffer more eagerly over consecutive calls
to poll_write_vectored. At the first non-empty slice, the
buffer is flushed if the slice does not fit completely,
after which filling can continue until capacity is exhausted, or
there is an oversized slice for direct writing,
or there are no more slices to write.
  • Loading branch information
mzabaluev committed Nov 7, 2020
1 parent 0a13f0d commit a553a7b
Showing 1 changed file with 40 additions and 37 deletions.
77 changes: 40 additions & 37 deletions tokio/src/io/util/buf_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::io::vec::AsyncVectoredWrite;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};

use pin_project_lite::pin_project;
use std::cmp;
use std::fmt;
use std::io::{self, IoSlice, Write};
use std::pin::Pin;
Expand Down Expand Up @@ -82,21 +83,6 @@ impl<W: AsyncWrite> BufWriter<W> {
Poll::Ready(ret)
}

fn poll_finish_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let me = self.project();
if buf.len() >= me.buf.capacity() {
debug_assert!(me.buf.is_empty());
me.inner.poll_write(cx, buf)
} else {
debug_assert!(me.buf.len() + buf.len() <= me.buf.capacity());
Poll::Ready(me.buf.write(buf))
}
}

/// Gets a reference to the underlying writer.
pub fn get_ref(&self) -> &W {
&self.inner
Expand Down Expand Up @@ -138,7 +124,13 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
if self.buf.len() + buf.len() > self.buf.capacity() {
ready!(self.as_mut().flush_buf(cx))?;
}
self.poll_finish_write(cx, buf)

let me = self.project();
if buf.len() >= me.buf.capacity() {
me.inner.poll_write(cx, buf)
} else {
Poll::Ready(me.buf.write(buf))
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Expand All @@ -163,29 +155,40 @@ impl<W: AsyncWrite> AsyncVectoredWrite for BufWriter<W> {
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
let mut total_written = 0;

// Try to fill the buffer, stopping at the first non-empty slice
// that does not fit.
// Note that this may leave some buffer capacity unfilled. However,
// more aggressive packing would require extra logic to avoid
// buffering roundtrips for oversized slices that are more
// efficiently written directly. We optimize for small inputs,
// because a scenario where writes made to BufWriter are often large
// is likely optimized wrongly anyway.
let stopped_on = bufs.iter().find(|&buf| {
let mut iter = bufs.iter();
if let Some(buf) = iter.by_ref().filter(|&buf| !buf.is_empty()).next() {
// This is the first non-empty slice to write, so if it does
// not fit in the buffer, we still get to flush and proceed.
if self.buf.len() + buf.len() > self.buf.capacity() {
ready!(self.as_mut().flush_buf(cx))?;
}
let me = self.as_mut().project();
if me.buf.len() + buf.len() > me.buf.capacity() {
return true;
if buf.len() >= me.buf.capacity() {
// The slice is at least as large as the buffering capacity,
// so it's better to write it directly, bypassing the buffer.
return me.inner.poll_write(cx, buf);
} else {
me.buf.extend_from_slice(buf);
total_written += buf.len();
}
me.buf.extend_from_slice(buf);
total_written += buf.len();
false
});

if total_written == 0 {
if let Some(buf) = stopped_on {
ready!(self.as_mut().flush_buf(cx))?;
return self.poll_finish_write(cx, buf);
debug_assert!(total_written != 0);
}
for buf in iter {
let me = self.as_mut().project();
if buf.len() >= me.buf.capacity() {
// This slice should be written directly, but we have already
// buffered some of the input. Bail out, expecting it to be
// handled as the first slice in the next call to
// poll_write_vectored.
break;
} else {
let fill_len = cmp::min(buf.len(), me.buf.capacity() - me.buf.len());
me.buf.extend_from_slice(&buf[..fill_len]);
total_written += fill_len;
if me.buf.capacity() == me.buf.len() {
// The buffer is full, bail out
break;
}
}
}
Poll::Ready(Ok(total_written))
Expand Down

0 comments on commit a553a7b

Please sign in to comment.