diff --git a/src/client/conn.rs b/src/client/conn.rs index c519cc30c7..a65e8df8da 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -82,7 +82,6 @@ where #[derive(Clone, Debug)] pub struct Builder { pub(super) exec: Exec, - h1_writev: Option, h1_title_case_headers: bool, h1_read_buf_exact_size: Option, h1_max_buf_size: Option, @@ -453,7 +452,6 @@ impl Builder { pub fn new() -> Builder { Builder { exec: Exec::Default, - h1_writev: None, h1_read_buf_exact_size: None, h1_title_case_headers: false, h1_max_buf_size: None, @@ -475,11 +473,6 @@ impl Builder { self } - pub(super) fn h1_writev(&mut self, enabled: bool) -> &mut Builder { - self.h1_writev = Some(enabled); - self - } - pub(super) fn h1_title_case_headers(&mut self, enabled: bool) -> &mut Builder { self.h1_title_case_headers = enabled; self @@ -663,13 +656,6 @@ impl Builder { #[cfg(feature = "http1")] Proto::Http1 => { let mut conn = proto::Conn::new(io); - if let Some(writev) = opts.h1_writev { - if writev { - conn.set_write_strategy_queue(); - } else { - conn.set_write_strategy_flatten(); - } - } if opts.h1_title_case_headers { conn.set_title_case_headers(); } diff --git a/src/client/mod.rs b/src/client/mod.rs index b2064d20ab..32fb4a6b51 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -62,7 +62,7 @@ use http::{Method, Request, Response, Uri, Version}; use self::connect::{sealed::Connect, Alpn, Connected, Connection}; use self::pool::{Key as PoolKey, Pool, Poolable, Pooled, Reservation}; use crate::body::{Body, HttpBody}; -use crate::common::{lazy as hyper_lazy, task, exec::BoxSendFuture, Future, Lazy, Pin, Poll}; +use crate::common::{exec::BoxSendFuture, lazy as hyper_lazy, task, Future, Lazy, Pin, Poll}; use crate::rt::Executor; #[cfg(feature = "tcp")] @@ -987,23 +987,6 @@ impl Builder { // HTTP/1 options - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// Note that setting this to false may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use - pub fn http1_writev(&mut self, val: bool) -> &mut Self { - self.conn_builder.h1_writev(val); - self - } - /// Sets the exact size of the read buffer to *always* use. /// /// Note that setting this option unsets the `http1_max_buf_size` option. diff --git a/src/common/io/mod.rs b/src/common/io/mod.rs index 2e6d506153..61dd038cc2 100644 --- a/src/common/io/mod.rs +++ b/src/common/io/mod.rs @@ -1,3 +1,4 @@ mod rewind; pub(crate) use self::rewind::Rewind; +pub(crate) const MAX_WRITEV_BUFS: usize = 64; diff --git a/src/common/io/rewind.rs b/src/common/io/rewind.rs index bb01064881..58f1de6c89 100644 --- a/src/common/io/rewind.rs +++ b/src/common/io/rewind.rs @@ -84,6 +84,14 @@ where Pin::new(&mut self.inner).poll_write(cx, buf) } + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_flush(cx) } @@ -91,6 +99,10 @@ where fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.inner).poll_shutdown(cx) } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } } #[cfg(test)] diff --git a/src/proto/h1/conn.rs b/src/proto/h1/conn.rs index a9599171d1..3226aaf885 100644 --- a/src/proto/h1/conn.rs +++ b/src/proto/h1/conn.rs @@ -71,14 +71,6 @@ where self.io.set_read_buf_exact_size(sz); } - pub fn set_write_strategy_flatten(&mut self) { - self.io.set_write_strategy_flatten(); - } - - pub fn set_write_strategy_queue(&mut self) { - self.io.set_write_strategy_queue(); - } - #[cfg(feature = "client")] pub fn set_title_case_headers(&mut self) { self.state.title_case_headers = true; diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 04dcfbcc6e..ed10374e18 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -1,4 +1,3 @@ -use std::cell::Cell; use std::cmp; use std::fmt; use std::io::{self, IoSlice}; @@ -57,13 +56,14 @@ where B: Buf, { pub fn new(io: T) -> Buffered { + let write_buf = WriteBuf::new(&io); Buffered { flush_pipeline: false, io, read_blocked: false, read_buf: BytesMut::with_capacity(0), read_buf_strategy: ReadStrategy::default(), - write_buf: WriteBuf::new(), + write_buf, } } @@ -98,13 +98,6 @@ where self.write_buf.set_strategy(WriteStrategy::Flatten); } - pub fn set_write_strategy_queue(&mut self) { - // this should always be called only at construction time, - // so this assert is here to catch myself - debug_assert!(self.write_buf.queue.bufs_cnt() == 0); - self.write_buf.set_strategy(WriteStrategy::Queue); - } - pub fn read_buf(&self) -> &[u8] { self.read_buf.as_ref() } @@ -237,13 +230,13 @@ where if let WriteStrategy::Flatten = self.write_buf.strategy { return self.poll_flush_flattened(cx); } + loop { - // TODO(eliza): this basically ignores all of `WriteBuf`...put - // back vectored IO and `poll_write_buf` when the appropriate Tokio - // changes land... - let n = ready!(Pin::new(&mut self.io) - // .poll_write_buf(cx, &mut self.write_buf.auto()))?; - .poll_write(cx, self.write_buf.auto().bytes()))?; + let n = { + let mut iovs = [IoSlice::new(&[]); crate::common::io::MAX_WRITEV_BUFS]; + let len = self.write_buf.bytes_vectored(&mut iovs); + ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))? + }; // TODO(eliza): we have to do this manually because // `poll_write_buf` doesn't exist in Tokio 0.3 yet...when // `poll_write_buf` comes back, the manual advance will need to leave! @@ -462,12 +455,17 @@ pub(super) struct WriteBuf { } impl WriteBuf { - fn new() -> WriteBuf { + fn new(io: &impl AsyncWrite) -> WriteBuf { + let strategy = if io.is_write_vectored() { + WriteStrategy::Queue + } else { + WriteStrategy::Flatten + }; WriteBuf { headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)), max_buf_size: DEFAULT_MAX_BUFFER_SIZE, queue: BufList::new(), - strategy: WriteStrategy::Auto, + strategy, } } } @@ -480,12 +478,6 @@ where self.strategy = strategy; } - // TODO(eliza): put back writev! - #[inline] - fn auto(&mut self) -> WriteBufAuto<'_, B> { - WriteBufAuto::new(self) - } - pub(super) fn buffer>(&mut self, mut buf: BB) { debug_assert!(buf.has_remaining()); match self.strategy { @@ -505,7 +497,7 @@ where buf.advance(adv); } } - WriteStrategy::Auto | WriteStrategy::Queue => { + WriteStrategy::Queue => { self.queue.push(buf.into()); } } @@ -514,7 +506,7 @@ where fn can_buffer(&self) -> bool { match self.strategy { WriteStrategy::Flatten => self.remaining() < self.max_buf_size, - WriteStrategy::Auto | WriteStrategy::Queue => { + WriteStrategy::Queue => { self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size } } @@ -573,65 +565,8 @@ impl Buf for WriteBuf { } } -/// Detects when wrapped `WriteBuf` is used for vectored IO, and -/// adjusts the `WriteBuf` strategy if not. -struct WriteBufAuto<'a, B: Buf> { - bytes_called: Cell, - bytes_vec_called: Cell, - inner: &'a mut WriteBuf, -} - -impl<'a, B: Buf> WriteBufAuto<'a, B> { - fn new(inner: &'a mut WriteBuf) -> WriteBufAuto<'a, B> { - WriteBufAuto { - bytes_called: Cell::new(false), - bytes_vec_called: Cell::new(false), - inner, - } - } -} - -impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> { - #[inline] - fn remaining(&self) -> usize { - self.inner.remaining() - } - - #[inline] - fn bytes(&self) -> &[u8] { - self.bytes_called.set(true); - self.inner.bytes() - } - - #[inline] - fn advance(&mut self, cnt: usize) { - self.inner.advance(cnt) - } - - #[inline] - fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize { - self.bytes_vec_called.set(true); - self.inner.bytes_vectored(dst) - } -} - -impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> { - fn drop(&mut self) { - if let WriteStrategy::Auto = self.inner.strategy { - if self.bytes_vec_called.get() { - self.inner.strategy = WriteStrategy::Queue; - } else if self.bytes_called.get() { - trace!("detected no usage of vectored write, flattening"); - self.inner.strategy = WriteStrategy::Flatten; - self.inner.headers.bytes.put(&mut self.inner.queue); - } - } - } -} - #[derive(Debug)] enum WriteStrategy { - Auto, Flatten, Queue, } @@ -643,8 +578,8 @@ mod tests { use tokio_test::io::Builder as Mock; - #[cfg(feature = "nightly")] - use test::Bencher; + // #[cfg(feature = "nightly")] + // use test::Bencher; /* impl MemRead for AsyncIo { @@ -873,33 +808,6 @@ mod tests { buffered.flush().await.expect("flush"); } - #[tokio::test] - async fn write_buf_auto_flatten() { - let _ = pretty_env_logger::try_init(); - - let mock = Mock::new() - // Expects write_buf to only consume first buffer - .write(b"hello ") - // And then the Auto strategy will have flattened - .write(b"world, it's hyper!") - .build(); - - let mut buffered = Buffered::<_, Cursor>>::new(mock); - - // we have 4 buffers, but hope to detect that vectored IO isn't - // being used, and switch to flattening automatically, - // resulting in only 2 writes - buffered.headers_buf().extend(b"hello "); - buffered.buffer(Cursor::new(b"world, ".to_vec())); - buffered.buffer(Cursor::new(b"it's ".to_vec())); - buffered.buffer(Cursor::new(b"hyper!".to_vec())); - assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3); - - buffered.flush().await.expect("flush"); - - assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0); - } - #[tokio::test] async fn write_buf_queue_disable_auto() { let _ = pretty_env_logger::try_init(); @@ -928,19 +836,19 @@ mod tests { assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0); } - #[cfg(feature = "nightly")] - #[bench] - fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) { - let s = "Hello, World!"; - b.bytes = s.len() as u64; - - let mut write_buf = WriteBuf::::new(); - write_buf.set_strategy(WriteStrategy::Flatten); - b.iter(|| { - let chunk = bytes::Bytes::from(s); - write_buf.buffer(chunk); - ::test::black_box(&write_buf); - write_buf.headers.bytes.clear(); - }) - } + // #[cfg(feature = "nightly")] + // #[bench] + // fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) { + // let s = "Hello, World!"; + // b.bytes = s.len() as u64; + + // let mut write_buf = WriteBuf::::new(); + // write_buf.set_strategy(WriteStrategy::Flatten); + // b.iter(|| { + // let chunk = bytes::Bytes::from(s); + // write_buf.buffer(chunk); + // ::test::black_box(&write_buf); + // write_buf.headers.bytes.clear(); + // }) + // } } diff --git a/src/server/conn.rs b/src/server/conn.rs index c54b8751e7..9a10f2dccc 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -90,7 +90,6 @@ pub struct Http { exec: E, h1_half_close: bool, h1_keep_alive: bool, - h1_writev: Option, #[cfg(feature = "http2")] h2_builder: proto::h2::server::Config, mode: ConnectionMode, @@ -242,7 +241,6 @@ impl Http { exec: Exec::Default, h1_half_close: false, h1_keep_alive: true, - h1_writev: None, #[cfg(feature = "http2")] h2_builder: Default::default(), mode: ConnectionMode::default(), @@ -295,26 +293,6 @@ impl Http { self } - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// Note that setting this to false may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use - #[inline] - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_writev(&mut self, val: bool) -> &mut Self { - self.h1_writev = Some(val); - self - } - /// Sets whether HTTP2 is required. /// /// Default is false @@ -488,7 +466,6 @@ impl Http { exec, h1_half_close: self.h1_half_close, h1_keep_alive: self.h1_keep_alive, - h1_writev: self.h1_writev, #[cfg(feature = "http2")] h2_builder: self.h2_builder, mode: self.mode, @@ -544,13 +521,6 @@ impl Http { if self.h1_half_close { conn.set_allow_half_close(); } - if let Some(writev) = self.h1_writev { - if writev { - conn.set_write_strategy_queue(); - } else { - conn.set_write_strategy_flatten(); - } - } conn.set_flush_pipeline(self.pipeline_flush); if let Some(max) = self.max_buf_size { conn.set_max_buf_size(max); diff --git a/src/server/mod.rs b/src/server/mod.rs index 0541fe3dda..9e8df1c899 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -284,27 +284,6 @@ impl Builder { self } - /// Set whether HTTP/1 connections should try to use vectored writes, - /// or always flatten into a single buffer. - /// - /// # Note - /// - /// Setting this to `false` may mean more copies of body data, - /// but may also improve performance when an IO transport doesn't - /// support vectored writes well, such as most TLS implementations. - /// - /// Setting this to true will force hyper to use queued strategy - /// which may eliminate unnecessary cloning on some TLS backends - /// - /// Default is `auto`. In this mode hyper will try to guess which - /// mode to use. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_writev(mut self, val: bool) -> Self { - self.protocol.http1_writev(val); - self - } - /// Sets whether HTTP/1 is required. /// /// Default is `false`. diff --git a/src/server/tcp.rs b/src/server/tcp.rs index bef38f2d4f..6fc5463a34 100644 --- a/src/server/tcp.rs +++ b/src/server/tcp.rs @@ -293,6 +293,15 @@ mod addr_stream { self.project().inner.poll_write(cx, buf) } + #[inline] + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + self.project().inner.poll_write_vectored(cx, bufs) + } + #[inline] fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll> { // TCP flush is a noop @@ -303,6 +312,15 @@ mod addr_stream { fn poll_shutdown(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { self.project().inner.poll_shutdown(cx) } + + #[inline] + fn is_write_vectored(&self) -> bool { + // Note that since `self.inner` is a `TcpStream`, this could + // *probably* be hard-coded to return `true`...but it seems more + // correct to ask it anyway (maybe we're on some platform without + // scatter-gather IO?) + self.inner.is_write_vectored() + } } #[cfg(unix)] diff --git a/src/upgrade.rs b/src/upgrade.rs index 4f377b8c4a..eb9ed97ef2 100644 --- a/src/upgrade.rs +++ b/src/upgrade.rs @@ -124,6 +124,14 @@ impl AsyncWrite for Upgraded { Pin::new(&mut self.io).poll_write(cx, buf) } + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.io).poll_write_vectored(cx, bufs) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.io).poll_flush(cx) } @@ -131,6 +139,10 @@ impl AsyncWrite for Upgraded { fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.io).poll_shutdown(cx) } + + fn is_write_vectored(&self) -> bool { + self.io.is_write_vectored() + } } impl fmt::Debug for Upgraded { @@ -261,6 +273,14 @@ impl AsyncWrite for ForwardsWriteBuf { Pin::new(&mut self.0).poll_write(cx, buf) } + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.0).poll_write_vectored(cx, bufs) + } + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.0).poll_flush(cx) } @@ -268,6 +288,10 @@ impl AsyncWrite for ForwardsWriteBuf { fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { Pin::new(&mut self.0).poll_shutdown(cx) } + + fn is_write_vectored(&self) -> bool { + self.0.is_write_vectored() + } } impl Io for ForwardsWriteBuf { @@ -276,6 +300,11 @@ impl Io for ForwardsWriteBuf { cx: &mut task::Context<'_>, buf: &mut dyn Buf, ) -> Poll> { + if self.0.is_write_vectored() { + let mut bufs = [io::IoSlice::new(&[]); crate::common::io::MAX_WRITEV_BUFS]; + let cnt = buf.bytes_vectored(&mut bufs); + return Pin::new(&mut self.0).poll_write_vectored(cx, &bufs[..cnt]); + } Pin::new(&mut self.0).poll_write(cx, buf.bytes()) } }