From 5ec094caa5c999e6f919a2bc82f5f3b7d40c2d8a Mon Sep 17 00:00:00 2001 From: Anthony Ramine <123095+nox@users.noreply.github.com> Date: Tue, 8 Feb 2022 02:35:34 +0100 Subject: [PATCH] feat(client): implement the HTTP/2 extended CONNECT protocol from RFC 8441 (#2682) --- src/client/conn.rs | 17 ++++++++++++ src/ext.rs | 60 +++++++++++++++++++++++++++++++++++++++++- src/lib.rs | 2 +- src/proto/h2/client.rs | 14 ++++++++++ src/proto/h2/server.rs | 12 ++++++++- src/server/conn.rs | 9 +++++++ src/server/server.rs | 9 +++++++ 7 files changed, 120 insertions(+), 3 deletions(-) diff --git a/src/client/conn.rs b/src/client/conn.rs index 1273edabf5..2418c9fa83 100644 --- a/src/client/conn.rs +++ b/src/client/conn.rs @@ -487,6 +487,23 @@ where Poll::Ready(Ok(conn.take().unwrap().into_parts())) }) } + + /// Returns whether the [extended CONNECT protocol][1] is enabled or not. + /// + /// This setting is configured by the server peer by sending the + /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame. + /// This method returns the currently acknowledged value recieved from the + /// remote. + /// + /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3 + #[cfg(feature = "http2")] + pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool { + match self.inner.as_ref().unwrap() { + ProtoClient::H1 { .. } => false, + ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(), + } + } } impl Future for Connection diff --git a/src/ext.rs b/src/ext.rs index 10cde75970..e9d4587784 100644 --- a/src/ext.rs +++ b/src/ext.rs @@ -1,9 +1,67 @@ -//! HTTP extensions +//! HTTP extensions. use bytes::Bytes; #[cfg(feature = "http1")] use http::header::{HeaderName, IntoHeaderName, ValueIter}; use http::HeaderMap; +#[cfg(feature = "http2")] +use std::fmt; + +#[cfg(feature = "http2")] +/// Represents the `:protocol` pseudo-header used by +/// the [Extended CONNECT Protocol]. +/// +/// [Extended CONNECT Protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 +#[derive(Clone, Eq, PartialEq)] +pub struct Protocol { + inner: h2::ext::Protocol, +} + +#[cfg(feature = "http2")] +impl Protocol { + /// Converts a static string to a protocol name. + pub const fn from_static(value: &'static str) -> Self { + Self { + inner: h2::ext::Protocol::from_static(value), + } + } + + /// Returns a str representation of the header. + pub fn as_str(&self) -> &str { + self.inner.as_str() + } + + pub(crate) fn from_inner(inner: h2::ext::Protocol) -> Self { + Self { inner } + } + + pub(crate) fn into_inner(self) -> h2::ext::Protocol { + self.inner + } +} + +#[cfg(feature = "http2")] +impl<'a> From<&'a str> for Protocol { + fn from(value: &'a str) -> Self { + Self { + inner: h2::ext::Protocol::from(value), + } + } +} + +#[cfg(feature = "http2")] +impl AsRef<[u8]> for Protocol { + fn as_ref(&self) -> &[u8] { + self.inner.as_ref() + } +} + +#[cfg(feature = "http2")] +impl fmt::Debug for Protocol { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.inner.fmt(f) + } +} /// A map from header names to their original casing as received in an HTTP message. /// diff --git a/src/lib.rs b/src/lib.rs index 41a0b37518..f7a93a1959 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,7 +76,7 @@ mod cfg; mod common; pub mod body; mod error; -mod ext; +pub mod ext; #[cfg(test)] mod mock; pub mod rt; diff --git a/src/proto/h2/client.rs b/src/proto/h2/client.rs index 809d8b8505..013f6fb5a8 100644 --- a/src/proto/h2/client.rs +++ b/src/proto/h2/client.rs @@ -14,6 +14,7 @@ use tracing::{debug, trace, warn}; use super::{ping, H2Upgraded, PipeToSendStream, SendBuf}; use crate::body::HttpBody; use crate::common::{exec::Exec, task, Future, Never, Pin, Poll}; +use crate::ext::Protocol; use crate::headers; use crate::proto::h2::UpgradedSendStream; use crate::proto::Dispatched; @@ -204,6 +205,15 @@ where req_rx: ClientRx, } +impl ClientTask +where + B: HttpBody + 'static, +{ + pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool { + self.h2_tx.is_extended_connect_protocol_enabled() + } +} + impl Future for ClientTask where B: HttpBody + Send + 'static, @@ -260,6 +270,10 @@ where } } + if let Some(protocol) = req.extensions_mut().remove::() { + req.extensions_mut().insert(protocol.into_inner()); + } + let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) { Ok(ok) => ok, Err(err) => { diff --git a/src/proto/h2/server.rs b/src/proto/h2/server.rs index 94452fb4bb..b9037ee3dd 100644 --- a/src/proto/h2/server.rs +++ b/src/proto/h2/server.rs @@ -15,6 +15,7 @@ use super::{ping, PipeToSendStream, SendBuf}; use crate::body::HttpBody; use crate::common::exec::ConnStreamExec; use crate::common::{date, task, Future, Pin, Poll}; +use crate::ext::Protocol; use crate::headers; use crate::proto::h2::ping::Recorder; use crate::proto::h2::{H2Upgraded, UpgradedSendStream}; @@ -41,6 +42,7 @@ pub(crate) struct Config { pub(crate) initial_conn_window_size: u32, pub(crate) initial_stream_window_size: u32, pub(crate) max_frame_size: u32, + pub(crate) enable_connect_protocol: bool, pub(crate) max_concurrent_streams: Option, #[cfg(feature = "runtime")] pub(crate) keep_alive_interval: Option, @@ -56,6 +58,7 @@ impl Default for Config { initial_conn_window_size: DEFAULT_CONN_WINDOW, initial_stream_window_size: DEFAULT_STREAM_WINDOW, max_frame_size: DEFAULT_MAX_FRAME_SIZE, + enable_connect_protocol: false, max_concurrent_streams: None, #[cfg(feature = "runtime")] keep_alive_interval: None, @@ -117,6 +120,9 @@ where if let Some(max) = config.max_concurrent_streams { builder.max_concurrent_streams(max); } + if config.enable_connect_protocol { + builder.enable_connect_protocol(); + } let handshake = builder.handshake(io); let bdp = if config.adaptive_window { @@ -280,7 +286,7 @@ where let is_connect = req.method() == Method::CONNECT; let (mut parts, stream) = req.into_parts(); - let (req, connect_parts) = if !is_connect { + let (mut req, connect_parts) = if !is_connect { ( Request::from_parts( parts, @@ -307,6 +313,10 @@ where ) }; + if let Some(protocol) = req.extensions_mut().remove::() { + req.extensions_mut().insert(Protocol::from_inner(protocol)); + } + let fut = H2Stream::new(service.call(req), connect_parts, respond); exec.execute_h2stream(fut); } diff --git a/src/server/conn.rs b/src/server/conn.rs index eb9d847788..de765b3a15 100644 --- a/src/server/conn.rs +++ b/src/server/conn.rs @@ -558,6 +558,15 @@ impl Http { self } + /// Enables the [extended CONNECT protocol]. + /// + /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + #[cfg(feature = "http2")] + pub fn http2_enable_connect_protocol(&mut self) -> &mut Self { + self.h2_builder.enable_connect_protocol = true; + self + } + /// Set the maximum buffer size for the connection. /// /// Default is ~400kb. diff --git a/src/server/server.rs b/src/server/server.rs index 9cfb8157bc..c48582c7fd 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -453,6 +453,15 @@ impl Builder { self } + /// Enables the [extended CONNECT protocol]. + /// + /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + #[cfg(feature = "http2")] + pub fn http2_enable_connect_protocol(mut self) -> Self { + self.protocol.http2_enable_connect_protocol(); + self + } + /// Sets the `Executor` to deal with connection tasks. /// /// Default is `tokio::spawn`.