diff --git a/src/proto/h1/io.rs b/src/proto/h1/io.rs index 04dcfbcc6e..6a99222564 100644 --- a/src/proto/h1/io.rs +++ b/src/proto/h1/io.rs @@ -1,4 +1,3 @@ -use std::cell::Cell; use std::cmp; use std::fmt; use std::io::{self, IoSlice}; @@ -57,13 +56,14 @@ where B: Buf, { pub fn new(io: T) -> Buffered { + let write_buf = WriteBuf::new(&io); Buffered { flush_pipeline: false, io, read_blocked: false, read_buf: BytesMut::with_capacity(0), read_buf_strategy: ReadStrategy::default(), - write_buf: WriteBuf::new(), + write_buf, } } @@ -237,13 +237,18 @@ where if let WriteStrategy::Flatten = self.write_buf.strategy { return self.poll_flush_flattened(cx); } + loop { - // TODO(eliza): this basically ignores all of `WriteBuf`...put - // back vectored IO and `poll_write_buf` when the appropriate Tokio - // changes land... - let n = ready!(Pin::new(&mut self.io) - // .poll_write_buf(cx, &mut self.write_buf.auto()))?; - .poll_write(cx, self.write_buf.auto().bytes()))?; + debug_assert!( + self.io.is_write_vectored(), + "using vectored writes on an IO that does not provide fast vectored \ + write support, this is a bug" + ); + let n = { + let mut iovs = [IoSlice::new(&[]); MAX_BUF_LIST_BUFFERS]; + let len = self.write_buf.bytes_vectored(&mut iovs); + ready!(Pin::new(&mut self.io).poll_write_vectored(cx, &iovs[..len]))? + }; // TODO(eliza): we have to do this manually because // `poll_write_buf` doesn't exist in Tokio 0.3 yet...when // `poll_write_buf` comes back, the manual advance will need to leave! @@ -462,12 +467,17 @@ pub(super) struct WriteBuf { } impl WriteBuf { - fn new() -> WriteBuf { + fn new(io: &impl AsyncWrite) -> WriteBuf { + let strategy = if io.is_write_vectored() { + WriteStrategy::Queue + } else { + WriteStrategy::Flatten + }; WriteBuf { headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)), max_buf_size: DEFAULT_MAX_BUFFER_SIZE, queue: BufList::new(), - strategy: WriteStrategy::Auto, + strategy, } } } @@ -480,12 +490,6 @@ where self.strategy = strategy; } - // TODO(eliza): put back writev! - #[inline] - fn auto(&mut self) -> WriteBufAuto<'_, B> { - WriteBufAuto::new(self) - } - pub(super) fn buffer>(&mut self, mut buf: BB) { debug_assert!(buf.has_remaining()); match self.strategy { @@ -505,7 +509,7 @@ where buf.advance(adv); } } - WriteStrategy::Auto | WriteStrategy::Queue => { + WriteStrategy::Queue => { self.queue.push(buf.into()); } } @@ -514,7 +518,7 @@ where fn can_buffer(&self) -> bool { match self.strategy { WriteStrategy::Flatten => self.remaining() < self.max_buf_size, - WriteStrategy::Auto | WriteStrategy::Queue => { + WriteStrategy::Queue => { self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size } } @@ -573,65 +577,8 @@ impl Buf for WriteBuf { } } -/// Detects when wrapped `WriteBuf` is used for vectored IO, and -/// adjusts the `WriteBuf` strategy if not. -struct WriteBufAuto<'a, B: Buf> { - bytes_called: Cell, - bytes_vec_called: Cell, - inner: &'a mut WriteBuf, -} - -impl<'a, B: Buf> WriteBufAuto<'a, B> { - fn new(inner: &'a mut WriteBuf) -> WriteBufAuto<'a, B> { - WriteBufAuto { - bytes_called: Cell::new(false), - bytes_vec_called: Cell::new(false), - inner, - } - } -} - -impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> { - #[inline] - fn remaining(&self) -> usize { - self.inner.remaining() - } - - #[inline] - fn bytes(&self) -> &[u8] { - self.bytes_called.set(true); - self.inner.bytes() - } - - #[inline] - fn advance(&mut self, cnt: usize) { - self.inner.advance(cnt) - } - - #[inline] - fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize { - self.bytes_vec_called.set(true); - self.inner.bytes_vectored(dst) - } -} - -impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> { - fn drop(&mut self) { - if let WriteStrategy::Auto = self.inner.strategy { - if self.bytes_vec_called.get() { - self.inner.strategy = WriteStrategy::Queue; - } else if self.bytes_called.get() { - trace!("detected no usage of vectored write, flattening"); - self.inner.strategy = WriteStrategy::Flatten; - self.inner.headers.bytes.put(&mut self.inner.queue); - } - } - } -} - #[derive(Debug)] enum WriteStrategy { - Auto, Flatten, Queue, } @@ -643,8 +590,8 @@ mod tests { use tokio_test::io::Builder as Mock; - #[cfg(feature = "nightly")] - use test::Bencher; + // #[cfg(feature = "nightly")] + // use test::Bencher; /* impl MemRead for AsyncIo { @@ -928,19 +875,19 @@ mod tests { assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0); } - #[cfg(feature = "nightly")] - #[bench] - fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) { - let s = "Hello, World!"; - b.bytes = s.len() as u64; - - let mut write_buf = WriteBuf::::new(); - write_buf.set_strategy(WriteStrategy::Flatten); - b.iter(|| { - let chunk = bytes::Bytes::from(s); - write_buf.buffer(chunk); - ::test::black_box(&write_buf); - write_buf.headers.bytes.clear(); - }) - } + // #[cfg(feature = "nightly")] + // #[bench] + // fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) { + // let s = "Hello, World!"; + // b.bytes = s.len() as u64; + + // let mut write_buf = WriteBuf::::new(); + // write_buf.set_strategy(WriteStrategy::Flatten); + // b.iter(|| { + // let chunk = bytes::Bytes::from(s); + // write_buf.buffer(chunk); + // ::test::black_box(&write_buf); + // write_buf.headers.bytes.clear(); + // }) + // } }