Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: reintroduce vectored writes #3149

Merged
merged 4 commits into from Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
131 changes: 130 additions & 1 deletion tokio/src/io/async_write.rs
@@ -1,4 +1,4 @@
use std::io;
use std::io::{self, IoSlice};
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -127,6 +127,51 @@ pub trait AsyncWrite {
/// This function will panic if not called within the context of a future's
/// task.
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>>;

/// Like [`poll_write`], except that it writes from a slice of buffers.
///
Comment on lines +131 to +132
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be worth saying something about how this is generally expected to use platform vectored write APIs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took the documentation for these 2 new methods directly from the standard library. I kind of like that they don't mention platform gather IO, and just leave that up to the implementor. 🤷

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, that's fair!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is worth saying that, if any data is written, the function returns Ok(n). I.e. an error / pending means no data was written.

/// Data is copied from each buffer in order, with the final buffer
/// read from possibly being only partially consumed. This method must
/// behave as a call to [`write`] with the buffers concatenated would.
///
/// The default implementation calls [`poll_write`] with either the first nonempty
/// buffer provided, or an empty one if none exists.
Comment on lines +137 to +138
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be a note that the default implementation is less efficient than just using poll_write and that poll_write_vectored should only be called when is_write_vectored returns true? maybe including an example of checking for vectored write capability before using it?

///
/// Attempt to write bytes from `buf` into the object.
///
/// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
///
/// If the object is not ready for writing, the method returns
/// `Poll::Pending` and arranges for the current task (via
/// `cx.waker()`) to receive a notification when the object becomes
/// writable or is closed.
///
/// [`poll_write`]: AsyncWrite::poll_write
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, io::Error>> {
let buf = bufs
.iter()
.find(|b| !b.is_empty())
.map_or(&[][..], |b| &**b);
self.poll_write(cx, buf)
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
}

/// Determines if this writer has an efficient [`poll_write_vectored`]
/// implementation.
///
/// If a writer does not override the default [`poll_write_vectored`]
/// implementation, code using it may want to avoid the method all together
/// and coalesce writes into a single buffer for higher performance.
///
/// The default implementation returns `false`.
///
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be worth documenting a contract around whether or not the return value from a given instance of a type implementing AsyncWrite is ever expected to change over its lifetime? e.g. can a library reasonably expect to be able to call is_write_vectored once for a given AsyncWrite instance and then always do vectored writes, or should it always check before writing?

/// [`poll_write_vectored`]: AsyncWrite::poll_write_vectored
fn is_write_vectored(&self) -> bool {
false
}
}

macro_rules! deref_async_write {
Expand All @@ -139,6 +184,18 @@ macro_rules! deref_async_write {
Pin::new(&mut **self).poll_write(cx, buf)
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut **self).poll_write_vectored(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
(**self).is_write_vectored()
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut **self).poll_flush(cx)
}
Expand Down Expand Up @@ -170,6 +227,18 @@ where
self.get_mut().as_mut().poll_write(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.get_mut().as_mut().poll_write_vectored(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
(**self).is_write_vectored()
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.get_mut().as_mut().poll_flush(cx)
}
Expand All @@ -189,6 +258,18 @@ impl AsyncWrite for Vec<u8> {
Poll::Ready(Ok(buf.len()))
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
}

fn is_write_vectored(&self) -> bool {
true
}

fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
Expand All @@ -207,6 +288,18 @@ impl AsyncWrite for io::Cursor<&mut [u8]> {
Poll::Ready(io::Write::write(&mut *self, buf))
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
}

fn is_write_vectored(&self) -> bool {
true
}

fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(io::Write::flush(&mut *self))
}
Expand All @@ -225,6 +318,18 @@ impl AsyncWrite for io::Cursor<&mut Vec<u8>> {
Poll::Ready(io::Write::write(&mut *self, buf))
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
}

fn is_write_vectored(&self) -> bool {
true
}

fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(io::Write::flush(&mut *self))
}
Expand All @@ -243,6 +348,18 @@ impl AsyncWrite for io::Cursor<Vec<u8>> {
Poll::Ready(io::Write::write(&mut *self, buf))
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
}

fn is_write_vectored(&self) -> bool {
true
}

fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(io::Write::flush(&mut *self))
}
Expand All @@ -261,6 +378,18 @@ impl AsyncWrite for io::Cursor<Box<[u8]>> {
Poll::Ready(io::Write::write(&mut *self, buf))
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write_vectored(&mut *self, bufs))
}

fn is_write_vectored(&self) -> bool {
true
}

fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(io::Write::flush(&mut *self))
}
Expand Down
13 changes: 13 additions & 0 deletions tokio/src/io/poll_evented.rs
Expand Up @@ -163,6 +163,19 @@ feature! {
use std::io::Write;
self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write(buf))
}

#[cfg(feature = "net")]
pub(crate) fn poll_write_vectored<'a>(
&'a self,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>>
where
&'a E: io::Write + 'a,
{
use std::io::Write;
self.registration.poll_write_io(cx, || self.io.as_ref().unwrap().write_vectored(bufs))
}
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
12 changes: 12 additions & 0 deletions tokio/src/net/tcp/split.rs
Expand Up @@ -147,6 +147,18 @@ impl AsyncWrite for WriteHalf<'_> {
self.0.poll_write_priv(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.0.poll_write_vectored_priv(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
self.0.is_write_vectored()
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// tcp flush is a no-op
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/net/tcp/split_owned.rs
Expand Up @@ -229,6 +229,18 @@ impl AsyncWrite for OwnedWriteHalf {
self.inner.poll_write_priv(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.inner.poll_write_vectored_priv(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// tcp flush is a no-op
Expand Down
20 changes: 20 additions & 0 deletions tokio/src/net/tcp/stream.rs
Expand Up @@ -762,6 +762,14 @@ impl TcpStream {
) -> Poll<io::Result<usize>> {
self.io.poll_write(cx, buf)
}

pub(super) fn poll_write_vectored_priv(
&self,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.io.poll_write_vectored(cx, bufs)
}
}

impl TryFrom<std::net::TcpStream> for TcpStream {
Expand Down Expand Up @@ -797,6 +805,18 @@ impl AsyncWrite for TcpStream {
self.poll_write_priv(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.poll_write_vectored_priv(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
true
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// tcp flush is a no-op
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/net/unix/split.rs
Expand Up @@ -68,6 +68,18 @@ impl AsyncWrite for WriteHalf<'_> {
self.0.poll_write_priv(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.0.poll_write_vectored_priv(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
self.0.is_write_vectored()
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
Expand Down
12 changes: 12 additions & 0 deletions tokio/src/net/unix/split_owned.rs
Expand Up @@ -153,6 +153,18 @@ impl AsyncWrite for OwnedWriteHalf {
self.inner.poll_write_priv(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.inner.poll_write_vectored_priv(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
// flush is a no-op
Expand Down
22 changes: 21 additions & 1 deletion tokio/src/net/unix/stream.rs
Expand Up @@ -172,6 +172,18 @@ impl AsyncWrite for UnixStream {
self.poll_write_priv(cx, buf)
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.poll_write_vectored_priv(cx, bufs)
}

fn is_write_vectored(&self) -> bool {
true
}

fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
Expand Down Expand Up @@ -199,7 +211,7 @@ impl UnixStream {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
// Safety: `UdpStream::read` correctly handles reads into uninitialized memory
// Safety: `UnixStream::read` correctly handles reads into uninitialized memory
unsafe { self.io.poll_read(cx, buf) }
}

Expand All @@ -210,6 +222,14 @@ impl UnixStream {
) -> Poll<io::Result<usize>> {
self.io.poll_write(cx, buf)
}

pub(super) fn poll_write_vectored_priv(
&self,
cx: &mut Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.io.poll_write_vectored(cx, bufs)
}
}

impl fmt::Debug for UnixStream {
Expand Down