From 73bf6a61ada04f332138b1eb0f79ecade219f00c Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 23 Nov 2020 16:35:48 -0800 Subject: [PATCH] re-enable vectored writes (#500) Tokio's AsyncWrite trait once again has support for vectored writes in Tokio 0.3.4 (see tokio-rs/tokio#3149. This branch re-enables vectored writes in h2. This change doesn't make all that big of a performance improvement in Hyper's HTTP/2 benchmarks, but they use a BytesMut as the buffer. With a buffer that turns into more IO vectors in bytes_vectored, there might be a more noticeable performance improvement. I spent a bit trying to refactor the flush logic to coalesce into fewer writev calls with more buffers, but the current implementation seems like about the best we're going to get without a bigger refactor. It's basically the same as what h2 did previously, so it's probably fine. --- Cargo.toml | 4 ++-- src/codec/framed_write.rs | 47 ++++++++++++++++++++++++--------------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3b55d080b..3b73efe4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ futures-core = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } tokio-util = { version = "0.5", features = ["codec"] } -tokio = { version = "0.3.2", features = ["io-util"] } +tokio = { version = "0.3.4", features = ["io-util"] } bytes = "0.6" http = "0.2" tracing = { version = "0.1.13", default-features = false, features = ["std", "log"] } @@ -68,7 +68,7 @@ serde = "1.0.0" serde_json = "1.0.0" # Examples -tokio = { version = "0.3.2", features = ["rt-multi-thread", "macros", "sync", "net"] } +tokio = { version = "0.3.4", features = ["rt-multi-thread", "macros", "sync", "net"] } env_logger = { version = "0.5.3", default-features = false } rustls = "0.18" tokio-rustls = "0.20.0" diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index 870b5589a..e2151d660 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -3,12 +3,12 @@ use crate::codec::UserError::*; use crate::frame::{self, Frame, FrameSize}; use crate::hpack; -use bytes::{buf::BufMut, Buf, BytesMut}; +use bytes::{Buf, BufMut, BytesMut}; use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use std::io::{self, Cursor}; +use std::io::{self, Cursor, IoSlice}; // A macro to get around a method needing to borrow &mut self macro_rules! limited_write_buf { @@ -39,6 +39,9 @@ pub struct FramedWrite { /// Max frame size, this is specified by the peer max_frame_size: FrameSize, + + /// Whether or not the wrapped `AsyncWrite` supports vectored IO. + is_write_vectored: bool, } #[derive(Debug)] @@ -68,6 +71,7 @@ where B: Buf, { pub fn new(inner: T) -> FramedWrite { + let is_write_vectored = inner.is_write_vectored(); FramedWrite { inner, hpack: hpack::Encoder::default(), @@ -75,6 +79,7 @@ where next: None, last_data_frame: None, max_frame_size: frame::DEFAULT_MAX_FRAME_SIZE, + is_write_vectored, } } @@ -182,6 +187,8 @@ where /// Flush buffered data to the wire pub fn flush(&mut self, cx: &mut Context) -> Poll> { + const MAX_IOVS: usize = 64; + let span = tracing::trace_span!("FramedWrite::flush"); let _e = span.enter(); @@ -190,25 +197,29 @@ where match self.next { Some(Next::Data(ref mut frame)) => { tracing::trace!(queued_data_frame = true); - - if self.buf.has_remaining() { - let n = - ready!(Pin::new(&mut self.inner).poll_write(cx, self.buf.bytes()))?; - self.buf.advance(n); - } - - let buf = frame.payload_mut(); - - if !self.buf.has_remaining() && buf.has_remaining() { - let n = ready!(Pin::new(&mut self.inner).poll_write(cx, buf.bytes()))?; - buf.advance(n); - } + let mut buf = (&mut self.buf).chain(frame.payload_mut()); + // TODO(eliza): when tokio-util 0.5.1 is released, this + // could just use `poll_write_buf`... + let n = if self.is_write_vectored { + let mut bufs = [IoSlice::new(&[]); MAX_IOVS]; + let cnt = buf.bytes_vectored(&mut bufs); + ready!(Pin::new(&mut self.inner).poll_write_vectored(cx, &bufs[..cnt]))? + } else { + ready!(Pin::new(&mut self.inner).poll_write(cx, buf.bytes()))? + }; + buf.advance(n); } _ => { tracing::trace!(queued_data_frame = false); - let n = ready!( - Pin::new(&mut self.inner).poll_write(cx, &mut self.buf.bytes()) - )?; + let n = if self.is_write_vectored { + let mut iovs = [IoSlice::new(&[]); MAX_IOVS]; + let cnt = self.buf.bytes_vectored(&mut iovs); + ready!( + Pin::new(&mut self.inner).poll_write_vectored(cx, &mut iovs[..cnt]) + )? + } else { + ready!(Pin::new(&mut self.inner).poll_write(cx, &mut self.buf.bytes()))? + }; self.buf.advance(n); } }