Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lib): re-enable writev support #2338

Merged
merged 10 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ http = "0.2"
http-body = { git = "https://github.com/hyperium/http-body" }
httpdate = "0.3"
httparse = "1.0"
h2 = { git = "https://github.com/hyperium/h2", optional = true }
h2 = { git = "https://github.com/hyperium/h2", optional = true, branch = "eliza/writev" }
hawkw marked this conversation as resolved.
Show resolved Hide resolved
itoa = "0.4.1"
tracing = { version = "0.1", default-features = false, features = ["log", "std"] }
pin-project = "1.0"
Expand Down Expand Up @@ -248,4 +248,4 @@ required-features = ["full"]
[[test]]
name = "server"
path = "tests/server.rs"
required-features = ["full"]
required-features = ["full"]
14 changes: 0 additions & 14 deletions src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ where
#[derive(Clone, Debug)]
pub struct Builder {
pub(super) exec: Exec,
h1_writev: Option<bool>,
h1_title_case_headers: bool,
h1_read_buf_exact_size: Option<usize>,
h1_max_buf_size: Option<usize>,
Expand Down Expand Up @@ -453,7 +452,6 @@ impl Builder {
pub fn new() -> Builder {
Builder {
exec: Exec::Default,
h1_writev: None,
h1_read_buf_exact_size: None,
h1_title_case_headers: false,
h1_max_buf_size: None,
Expand All @@ -475,11 +473,6 @@ impl Builder {
self
}

pub(super) fn h1_writev(&mut self, enabled: bool) -> &mut Builder {
self.h1_writev = Some(enabled);
self
}

pub(super) fn h1_title_case_headers(&mut self, enabled: bool) -> &mut Builder {
self.h1_title_case_headers = enabled;
self
Expand Down Expand Up @@ -663,13 +656,6 @@ impl Builder {
#[cfg(feature = "http1")]
Proto::Http1 => {
let mut conn = proto::Conn::new(io);
if let Some(writev) = opts.h1_writev {
if writev {
conn.set_write_strategy_queue();
} else {
conn.set_write_strategy_flatten();
}
}
if opts.h1_title_case_headers {
conn.set_title_case_headers();
}
Expand Down
19 changes: 1 addition & 18 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use http::{Method, Request, Response, Uri, Version};
use self::connect::{sealed::Connect, Alpn, Connected, Connection};
use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation};
use crate::body::{Body, HttpBody};
use crate::common::{lazy as hyper_lazy, task, exec::BoxSendFuture, Future, Lazy, Pin, Poll};
use crate::common::{exec::BoxSendFuture, lazy as hyper_lazy, task, Future, Lazy, Pin, Poll};
use crate::rt::Executor;

#[cfg(feature = "tcp")]
Expand Down Expand Up @@ -987,23 +987,6 @@ impl Builder {

// HTTP/1 options

/// Set whether HTTP/1 connections should try to use vectored writes,
/// or always flatten into a single buffer.
///
/// Note that setting this to false may mean more copies of body data,
/// but may also improve performance when an IO transport doesn't
/// support vectored writes well, such as most TLS implementations.
///
/// Setting this to true will force hyper to use queued strategy
/// which may eliminate unnecessary cloning on some TLS backends
///
/// Default is `auto`. In this mode hyper will try to guess which
/// mode to use
pub fn http1_writev(&mut self, val: bool) -> &mut Self {
self.conn_builder.h1_writev(val);
self
}

/// Sets the exact size of the read buffer to *always* use.
///
/// Note that setting this option unsets the `http1_max_buf_size` option.
Expand Down
1 change: 1 addition & 0 deletions src/common/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod rewind;

pub(crate) use self::rewind::Rewind;
pub(crate) const MAX_WRITEV_BUFS: usize = 64;
12 changes: 12 additions & 0 deletions src/common/io/rewind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,25 @@ where
Pin::new(&mut self.inner).poll_write(cx, buf)
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
bufs: &[io::IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write_vectored(cx, bufs)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}

fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
}
}

#[cfg(test)]
Expand Down
8 changes: 0 additions & 8 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@ where
self.io.set_read_buf_exact_size(sz);
}

pub fn set_write_strategy_flatten(&mut self) {
self.io.set_write_strategy_flatten();
}

pub fn set_write_strategy_queue(&mut self) {
self.io.set_write_strategy_queue();
}

#[cfg(feature = "client")]
pub fn set_title_case_headers(&mut self) {
self.state.title_case_headers = true;
Expand Down
160 changes: 34 additions & 126 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::cell::Cell;
use std::cmp;
use std::fmt;
use std::io::{self, IoSlice};
Expand Down Expand Up @@ -57,13 +56,14 @@ where
B: Buf,
{
pub fn new(io: T) -> Buffered<T, B> {
let write_buf = WriteBuf::new(&io);
Buffered {
flush_pipeline: false,
io,
read_blocked: false,
read_buf: BytesMut::with_capacity(0),
read_buf_strategy: ReadStrategy::default(),
write_buf: WriteBuf::new(),
write_buf,
}
}

Expand Down Expand Up @@ -98,13 +98,6 @@ where
self.write_buf.set_strategy(WriteStrategy::Flatten);
}

pub fn set_write_strategy_queue(&mut self) {
// this should always be called only at construction time,
// so this assert is here to catch myself
debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
self.write_buf.set_strategy(WriteStrategy::Queue);
}

pub fn read_buf(&self) -> &[u8] {
self.read_buf.as_ref()
}
Expand Down Expand Up @@ -237,13 +230,13 @@ where
if let WriteStrategy::Flatten = self.write_buf.strategy {
return self.poll_flush_flattened(cx);
}

loop {
// TODO(eliza): this basically ignores all of `WriteBuf`...put
// back vectored IO and `poll_write_buf` when the appropriate Tokio
// changes land...
let n = ready!(Pin::new(&mut self.io)
// .poll_write_buf(cx, &mut self.write_buf.auto()))?;
.poll_write(cx, self.write_buf.auto().bytes()))?;
let n = {
let mut iovs = [IoSlice::new(&[]); crate::common::io::MAX_WRITEV_BUFS];
let len = self.write_buf.bytes_vectored(&mut iovs);
ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))?
};
// TODO(eliza): we have to do this manually because
// `poll_write_buf` doesn't exist in Tokio 0.3 yet...when
// `poll_write_buf` comes back, the manual advance will need to leave!
Expand Down Expand Up @@ -462,12 +455,17 @@ pub(super) struct WriteBuf<B> {
}

impl<B: Buf> WriteBuf<B> {
fn new() -> WriteBuf<B> {
fn new(io: &impl AsyncWrite) -> WriteBuf<B> {
let strategy = if io.is_write_vectored() {
WriteStrategy::Queue
} else {
WriteStrategy::Flatten
};
WriteBuf {
headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
queue: BufList::new(),
strategy: WriteStrategy::Auto,
strategy,
}
}
}
Expand All @@ -480,12 +478,6 @@ where
self.strategy = strategy;
}

// TODO(eliza): put back writev!
#[inline]
fn auto(&mut self) -> WriteBufAuto<'_, B> {
WriteBufAuto::new(self)
}

pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
debug_assert!(buf.has_remaining());
match self.strategy {
Expand All @@ -505,7 +497,7 @@ where
buf.advance(adv);
}
}
WriteStrategy::Auto | WriteStrategy::Queue => {
WriteStrategy::Queue => {
self.queue.push(buf.into());
}
}
Expand All @@ -514,7 +506,7 @@ where
fn can_buffer(&self) -> bool {
match self.strategy {
WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
WriteStrategy::Auto | WriteStrategy::Queue => {
WriteStrategy::Queue => {
self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
}
}
Expand Down Expand Up @@ -573,65 +565,8 @@ impl<B: Buf> Buf for WriteBuf<B> {
}
}

/// Detects when wrapped `WriteBuf` is used for vectored IO, and
/// adjusts the `WriteBuf` strategy if not.
struct WriteBufAuto<'a, B: Buf> {
bytes_called: Cell<bool>,
bytes_vec_called: Cell<bool>,
inner: &'a mut WriteBuf<B>,
}

impl<'a, B: Buf> WriteBufAuto<'a, B> {
fn new(inner: &'a mut WriteBuf<B>) -> WriteBufAuto<'a, B> {
WriteBufAuto {
bytes_called: Cell::new(false),
bytes_vec_called: Cell::new(false),
inner,
}
}
}

impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> {
#[inline]
fn remaining(&self) -> usize {
self.inner.remaining()
}

#[inline]
fn bytes(&self) -> &[u8] {
self.bytes_called.set(true);
self.inner.bytes()
}

#[inline]
fn advance(&mut self, cnt: usize) {
self.inner.advance(cnt)
}

#[inline]
fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
self.bytes_vec_called.set(true);
self.inner.bytes_vectored(dst)
}
}

impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
fn drop(&mut self) {
if let WriteStrategy::Auto = self.inner.strategy {
if self.bytes_vec_called.get() {
self.inner.strategy = WriteStrategy::Queue;
} else if self.bytes_called.get() {
trace!("detected no usage of vectored write, flattening");
self.inner.strategy = WriteStrategy::Flatten;
self.inner.headers.bytes.put(&mut self.inner.queue);
}
}
}
}

#[derive(Debug)]
enum WriteStrategy {
Auto,
Flatten,
Queue,
}
Expand All @@ -643,8 +578,8 @@ mod tests {

use tokio_test::io::Builder as Mock;

#[cfg(feature = "nightly")]
use test::Bencher;
// #[cfg(feature = "nightly")]
// use test::Bencher;

/*
impl<T: Read> MemRead for AsyncIo<T> {
Expand Down Expand Up @@ -873,33 +808,6 @@ mod tests {
buffered.flush().await.expect("flush");
}

#[tokio::test]
async fn write_buf_auto_flatten() {
let _ = pretty_env_logger::try_init();

let mock = Mock::new()
// Expects write_buf to only consume first buffer
.write(b"hello ")
// And then the Auto strategy will have flattened
.write(b"world, it's hyper!")
.build();

let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);

// we have 4 buffers, but hope to detect that vectored IO isn't
// being used, and switch to flattening automatically,
// resulting in only 2 writes
buffered.headers_buf().extend(b"hello ");
buffered.buffer(Cursor::new(b"world, ".to_vec()));
buffered.buffer(Cursor::new(b"it's ".to_vec()));
buffered.buffer(Cursor::new(b"hyper!".to_vec()));
assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);

buffered.flush().await.expect("flush");

assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
}

#[tokio::test]
async fn write_buf_queue_disable_auto() {
let _ = pretty_env_logger::try_init();
Expand Down Expand Up @@ -928,19 +836,19 @@ mod tests {
assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
}

#[cfg(feature = "nightly")]
#[bench]
fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) {
let s = "Hello, World!";
b.bytes = s.len() as u64;

let mut write_buf = WriteBuf::<bytes::Bytes>::new();
write_buf.set_strategy(WriteStrategy::Flatten);
b.iter(|| {
let chunk = bytes::Bytes::from(s);
write_buf.buffer(chunk);
::test::black_box(&write_buf);
write_buf.headers.bytes.clear();
})
}
// #[cfg(feature = "nightly")]
// #[bench]
// fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) {
// let s = "Hello, World!";
// b.bytes = s.len() as u64;

// let mut write_buf = WriteBuf::<bytes::Bytes>::new();
// write_buf.set_strategy(WriteStrategy::Flatten);
// b.iter(|| {
// let chunk = bytes::Bytes::from(s);
// write_buf.buffer(chunk);
// ::test::black_box(&write_buf);
// write_buf.headers.bytes.clear();
// })
// }
}