Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trait AsyncVectoredWrite and related utilities #3092

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ pub use self::async_write::AsyncWrite;
mod read_buf;
pub use self::read_buf::ReadBuf;

pub mod vec;

// Re-export some types from `std::io` so that users don't have to deal
// with conflicts when `use`ing `tokio::io` and `std::io`.
#[doc(no_inline)]
Expand Down
14 changes: 13 additions & 1 deletion tokio/src/io/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
//! To restore this read/write object from its `split::ReadHalf` and
//! `split::WriteHalf` use `unsplit`.

use crate::io::vec::AsyncVectoredWrite;
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};

use std::cell::UnsafeCell;
use std::fmt;
use std::io;
use std::io::{self, IoSlice};
use std::pin::Pin;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::{Acquire, Release};
Expand Down Expand Up @@ -129,6 +130,17 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
}
}

impl<T: AsyncVectoredWrite> AsyncVectoredWrite for WriteHalf<T> {
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
let mut inner = ready!(self.inner.poll_lock(cx));
inner.stream_pin().poll_write_vectored(cx, bufs)
}
}

impl<T> Inner<T> {
fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<Guard<'_, T>> {
if !self.locked.compare_and_swap(false, true, Acquire) {
Expand Down
13 changes: 12 additions & 1 deletion tokio/src/io/util/buf_reader.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::io::util::DEFAULT_BUF_SIZE;
use crate::io::vec::AsyncVectoredWrite;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};

use pin_project_lite::pin_project;
use std::io;
use std::io::{self, IoSlice};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{cmp, fmt};
Expand Down Expand Up @@ -159,6 +160,16 @@ impl<R: AsyncRead + AsyncWrite> AsyncWrite for BufReader<R> {
}
}

impl<R: AsyncRead + AsyncVectoredWrite> AsyncVectoredWrite for BufReader<R> {
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.get_pin_mut().poll_write_vectored(cx, bufs)
}
}

impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BufReader")
Expand Down
17 changes: 16 additions & 1 deletion tokio/src/io/util/buf_stream.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::io::util::{BufReader, BufWriter};
use crate::io::vec::AsyncVectoredWrite;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};

use pin_project_lite::pin_project;
use std::io;
use std::io::{self, IoSlice};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -132,6 +133,20 @@ impl<RW: AsyncRead + AsyncWrite> AsyncWrite for BufStream<RW> {
}
}

/// This implementation takes advantage of the buffering to emulate
/// efficient vectored output on I/O objects that don't natively support it.
/// With this, `BufStream` can be used as an adapter for generic code
/// that requires `AsyncVectoredWrite`.
impl<RW: AsyncRead + AsyncWrite> AsyncVectoredWrite for BufStream<RW> {
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.project().inner.poll_write_vectored(cx, bufs)
}
}

impl<RW: AsyncRead + AsyncWrite> AsyncRead for BufStream<RW> {
fn poll_read(
self: Pin<&mut Self>,
Expand Down
55 changes: 54 additions & 1 deletion tokio/src/io/util/buf_writer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::io::util::DEFAULT_BUF_SIZE;
use crate::io::vec::AsyncVectoredWrite;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};

use pin_project_lite::pin_project;
use std::cmp;
use std::fmt;
use std::io::{self, Write};
use std::io::{self, IoSlice, Write};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -142,6 +144,57 @@ impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
}
}

/// This implementation takes advantage of the buffering to emulate
/// efficient vectored output on writers that don't natively support it.
/// With this, `BufWriter` can be used as an adapter for generic code
/// that requires `AsyncVectoredWrite`.
impl<W: AsyncWrite> AsyncVectoredWrite for BufWriter<W> {
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
let mut total_written = 0;
let mut iter = bufs.iter();
if let Some(buf) = iter.by_ref().find(|&buf| !buf.is_empty()) {
// This is the first non-empty slice to write, so if it does
// not fit in the buffer, we still get to flush and proceed.
if self.buf.len() + buf.len() > self.buf.capacity() {
ready!(self.as_mut().flush_buf(cx))?;
}
let me = self.as_mut().project();
if buf.len() >= me.buf.capacity() {
// The slice is at least as large as the buffering capacity,
// so it's better to write it directly, bypassing the buffer.
return me.inner.poll_write(cx, buf);
} else {
me.buf.extend_from_slice(buf);
total_written += buf.len();
}
debug_assert!(total_written != 0);
}
for buf in iter {
let me = self.as_mut().project();
if buf.len() >= me.buf.capacity() {
// This slice should be written directly, but we have already
// buffered some of the input. Bail out, expecting it to be
// handled as the first slice in the next call to
// poll_write_vectored.
break;
} else {
let fill_len = cmp::min(buf.len(), me.buf.capacity() - me.buf.len());
me.buf.extend_from_slice(&buf[..fill_len]);
total_written += fill_len;
if me.buf.capacity() == me.buf.len() {
// The buffer is full, bail out
break;
}
}
}
Poll::Ready(Ok(total_written))
}
}

impl<W: AsyncWrite + AsyncRead> AsyncRead for BufWriter<W> {
fn poll_read(
self: Pin<&mut Self>,
Expand Down
14 changes: 13 additions & 1 deletion tokio/src/io/util/sink.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::io::vec::AsyncVectoredWrite;
use crate::io::AsyncWrite;

use std::fmt;
use std::io;
use std::io::{self, IoSlice};
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -70,6 +71,17 @@ impl AsyncWrite for Sink {
}
}

impl AsyncVectoredWrite for Sink {
fn poll_write_vectored(
self: Pin<&mut Self>,
_: &mut Context<'_>,
slices: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
let total_len = slices.iter().map(|s| s.len()).sum();
Poll::Ready(Ok(total_len))
}
}

impl fmt::Debug for Sink {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Sink { .. }")
Expand Down
101 changes: 101 additions & 0 deletions tokio/src/io/vec/async_vectored_write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use crate::io::AsyncWrite;
use std::io::{self, IoSlice};
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};

/// Writes bytes from a slice of buffers asynchronously.
///
/// This trait extends [`AsyncWrite`], providing
/// the functionality of [`std::io::Write::write_vectored`]
/// in a non-blocking way, and indicates that an I/O object has an efficient
/// implementation for vectored writes.
pub trait AsyncVectoredWrite: AsyncWrite {
/// Attempt to write bytes from `slices` into the object.
///
/// Data is copied from each buffer in order, with the final buffer
/// copied from possibly being only partially consumed.
/// This method must behave as a call to [`poll_write`]
/// with the buffers concatenated would.
///
/// On success, returns `Poll::Ready(Ok(num_bytes_written))`.
///
/// If the object is not ready for writing, the method returns
/// `Poll::Pending` and arranges for the current task (via
/// `cx.waker()`) to receive a notification when the object becomes
/// writable or is closed.
///
/// [`poll_write`]: AsyncWrite::poll_write()
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
slices: &[IoSlice<'_>],
) -> Poll<io::Result<usize>>;
}

macro_rules! deref_async_vectored_write {
() => {
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
slices: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut **self).poll_write_vectored(cx, slices)
}
};
}

impl<T: ?Sized + AsyncVectoredWrite + Unpin> AsyncVectoredWrite for Box<T> {
deref_async_vectored_write!();
}

impl<T: ?Sized + AsyncVectoredWrite + Unpin> AsyncVectoredWrite for &mut T {
deref_async_vectored_write!();
}

impl<P> AsyncVectoredWrite for Pin<P>
where
P: DerefMut + Unpin,
P::Target: AsyncVectoredWrite,
{
fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
slices: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.get_mut().as_mut().poll_write_vectored(cx, slices)
}
}

macro_rules! delegate_async_vectored_write_to_std {
() => {
#[inline]
fn poll_write_vectored(
self: Pin<&mut Self>,
_: &mut Context<'_>,
slices: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Poll::Ready(io::Write::write_vectored(self.get_mut(), slices))
}
};
}

impl AsyncVectoredWrite for Vec<u8> {
delegate_async_vectored_write_to_std!();
}

impl AsyncVectoredWrite for io::Cursor<&mut [u8]> {
delegate_async_vectored_write_to_std!();
}

impl AsyncVectoredWrite for io::Cursor<&mut Vec<u8>> {
delegate_async_vectored_write_to_std!();
}

impl AsyncVectoredWrite for io::Cursor<Vec<u8>> {
delegate_async_vectored_write_to_std!();
}

impl AsyncVectoredWrite for io::Cursor<Box<[u8]>> {
delegate_async_vectored_write_to_std!();
}
14 changes: 14 additions & 0 deletions tokio/src/io/vec/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
//! Asynchronous vectored I/O.
//!
//! This module provides traits, helpers, and type definitions for efficient
//! asynchronous vectored I/O.

mod async_vectored_write;
pub use async_vectored_write::AsyncVectoredWrite;

cfg_io_util! {

mod util;
pub use util::{AsyncVectoredWriteExt, WriteVectored};

}
30 changes: 30 additions & 0 deletions tokio/src/io/vec/util/async_vectored_write_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use super::write_vectored::{write_vectored, WriteVectored};
use crate::io::vec::AsyncVectoredWrite;

use std::io::IoSlice;

/// Vectored output with an async method.
///
/// Implemented as an extention trait, adding the `write_vectored`
/// utility method to all [`AsyncVectoredWrite`] types. Callers will
/// tend to import this trait instead of [`AsyncVectoredWrite`].
pub trait AsyncVectoredWriteExt: AsyncVectoredWrite {
/// Like [`AsyncWriteExt::write`], except that it writes from
/// a slice of buffers.
///
/// Equivalent to:
///
/// ```ignore
/// async fn write_vectored(&mut self, slices: &[IoSlice<'_>]) -> io::Result<usize>;
/// ```
///
/// [`AsyncWriteExt::write`]: crate::io::AsyncWriteExt::write
fn write_vectored<'a>(&'a mut self, slices: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
where
Self: Unpin,
{
write_vectored(self, slices)
}
}

impl<W: AsyncVectoredWrite + ?Sized> AsyncVectoredWriteExt for W {}
9 changes: 9 additions & 0 deletions tokio/src/io/vec/util/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#![allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411

cfg_io_util! {
mod async_vectored_write_ext;
pub use async_vectored_write_ext::AsyncVectoredWriteExt;

mod write_vectored;
pub use write_vectored::WriteVectored;
}