Skip to content

Commit

Permalink
util: add writev-aware poll_write_buf (#3156)
Browse files Browse the repository at this point in the history
## Motivation

In Tokio 0.2, `AsyncRead` and `AsyncWrite` had `poll_write_buf` and
`poll_read_buf` methods for reading and writing to implementers of
`bytes` `Buf` and `BufMut` traits. In 0.3, these were removed, but
`poll_read_buf` was added as a free function in `tokio-util`. However,
there is currently no `poll_write_buf`.

Now that `AsyncWrite` has regained support for vectored writes in #3149,
there's a lot of potential benefit in having a `poll_write_buf` that
uses vectored writes when supported and non-vectored writes when not
supported, so that users don't have to reimplement this.

## Solution

This PR adds a `poll_write_buf` function to `tokio_util::io`, analogous
to the existing `poll_read_buf` function.

This function writes from a `Buf` to an `AsyncWrite`, advancing the
`Buf`'s internal cursor. In addition, when the `AsyncWrite` supports
vectored writes (i.e. its `is_write_vectored` method returns `true`),
it will use vectored IO.

I copied the documentation for this functions from the docs from Tokio
0.2's `AsyncWrite::poll_write_buf` , with some minor modifications as
appropriate.

Finally, I fixed a minor issue in the existing docs for `poll_read_buf`
and `read_buf`, and updated `tokio_util::codec` to use `poll_write_buf`.

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Dec 3, 2020
1 parent a6051a6 commit 6472998
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 14 deletions.
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Expand Up @@ -34,7 +34,7 @@ io = []
rt = ["tokio/rt"]

[dependencies]
tokio = { version = "0.3.0", path = "../tokio" }
tokio = { version = "0.3.4", path = "../tokio" }

bytes = "0.6.0"
futures-core = "0.3.0"
Expand Down
8 changes: 3 additions & 5 deletions tokio-util/src/codec/framed_impl.rs
Expand Up @@ -6,7 +6,7 @@ use tokio::{
stream::Stream,
};

use bytes::{Buf, BytesMut};
use bytes::BytesMut;
use futures_core::ready;
use futures_sink::Sink;
use log::trace;
Expand Down Expand Up @@ -189,15 +189,15 @@ where
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
use crate::util::poll_write_buf;
trace!("flushing framed transport");
let mut pinned = self.project();

while !pinned.state.borrow_mut().buffer.is_empty() {
let WriteFrame { buffer } = pinned.state.borrow_mut();
trace!("writing; remaining={}", buffer.len());

let buf = &buffer;
let n = ready!(pinned.inner.as_mut().poll_write(cx, &buf))?;
let n = ready!(poll_write_buf(pinned.inner.as_mut(), cx, buffer))?;

if n == 0 {
return Poll::Ready(Err(io::Error::new(
Expand All @@ -207,8 +207,6 @@ where
)
.into()));
}

pinned.state.borrow_mut().buffer.advance(n);
}

// Try flushing the underlying IO
Expand Down
2 changes: 1 addition & 1 deletion tokio-util/src/io/mod.rs
Expand Up @@ -13,4 +13,4 @@ mod stream_reader;
pub use self::read_buf::read_buf;
pub use self::reader_stream::ReaderStream;
pub use self::stream_reader::StreamReader;
pub use crate::util::poll_read_buf;
pub use crate::util::{poll_read_buf, poll_write_buf};
4 changes: 2 additions & 2 deletions tokio-util/src/io/read_buf.rs
Expand Up @@ -5,9 +5,9 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;

/// Read data from an `AsyncRead` into an implementer of the [`Buf`] trait.
/// Read data from an `AsyncRead` into an implementer of the [`BufMut`] trait.
///
/// [`Buf`]: bytes::Buf
/// [`BufMut`]: bytes::BufMut
///
/// # Example
///
Expand Down
75 changes: 70 additions & 5 deletions tokio-util/src/lib.rs
Expand Up @@ -55,18 +55,18 @@ pub mod time;

#[cfg(any(feature = "io", feature = "codec"))]
mod util {
use tokio::io::{AsyncRead, ReadBuf};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};

use bytes::BufMut;
use bytes::{Buf, BufMut};
use futures_core::ready;
use std::io;
use std::io::{self, IoSlice};
use std::mem::MaybeUninit;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Try to read data from an `AsyncRead` into an implementer of the [`Buf`] trait.
/// Try to read data from an `AsyncRead` into an implementer of the [`BufMut`] trait.
///
/// [`Buf`]: bytes::Buf
/// [`BufMut`]: bytes::Buf
///
/// # Example
///
Expand Down Expand Up @@ -132,4 +132,69 @@ mod util {

Poll::Ready(Ok(n))
}

/// Try to write data from an implementer of the [`Buf`] trait to an
/// [`AsyncWrite`], advancing the buffer's internal cursor.
///
/// This function will use [vectored writes] when the [`AsyncWrite`] supports
/// vectored writes.
///
/// # Examples
///
/// [`File`] implements [`AsyncWrite`] and [`Cursor<&[u8]>`] implements
/// [`Buf`]:
///
/// ```no_run
/// use tokio_util::io::poll_write_buf;
/// use tokio::io;
/// use tokio::fs::File;
///
/// use bytes::Buf;
/// use std::io::Cursor;
/// use std::pin::Pin;
/// use futures::future::poll_fn;
///
/// #[tokio::main]
/// async fn main() -> io::Result<()> {
/// let mut file = File::create("foo.txt").await?;
/// let mut buf = Cursor::new(b"data to write");
///
/// // Loop until the entire contents of the buffer are written to
/// // the file.
/// while buf.has_remaining() {
/// poll_fn(|cx| poll_write_buf(Pin::new(&mut file), cx, &mut buf)).await?;
/// }
///
/// Ok(())
/// }
/// ```
///
/// [`Buf`]: bytes::Buf
/// [`AsyncWrite`]: tokio::io::AsyncWrite
/// [`File`]: tokio::fs::File
/// [vectored writes]: tokio::io::AsyncWrite::poll_write_vectored
#[cfg_attr(not(feature = "io"), allow(unreachable_pub))]
pub fn poll_write_buf<T: AsyncWrite, B: Buf>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
const MAX_BUFS: usize = 64;

if !buf.has_remaining() {
return Poll::Ready(Ok(0));
}

let n = if io.is_write_vectored() {
let mut slices = [IoSlice::new(&[]); MAX_BUFS];
let cnt = buf.bytes_vectored(&mut slices);
ready!(io.poll_write_vectored(cx, &slices[..cnt]))?
} else {
ready!(io.poll_write(cx, buf.bytes()))?
};

buf.advance(n);

Poll::Ready(Ok(n))
}
}

0 comments on commit 6472998

Please sign in to comment.