Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(h2): implement the extended CONNECT protocol from RFC 8441 #2682

Merged
merged 1 commit into from Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/client/conn.rs
Expand Up @@ -487,6 +487,23 @@ where
Poll::Ready(Ok(conn.take().unwrap().into_parts()))
})
}

/// Returns whether the [extended CONNECT protocol][1] is enabled or not.
///
/// This setting is configured by the server peer by sending the
/// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
/// This method returns the currently acknowledged value recieved from the
/// remote.
///
/// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
/// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
#[cfg(feature = "http2")]
pub fn http2_is_extended_connect_protocol_enabled(&self) -> bool {
match self.inner.as_ref().unwrap() {
ProtoClient::H1 { .. } => false,
ProtoClient::H2 { h2 } => h2.is_extended_connect_protocol_enabled(),
}
}
}

impl<T, B> Future for Connection<T, B>
Expand Down
60 changes: 59 additions & 1 deletion src/ext.rs
@@ -1,9 +1,67 @@
//! HTTP extensions
//! HTTP extensions.

use bytes::Bytes;
#[cfg(feature = "http1")]
use http::header::{HeaderName, IntoHeaderName, ValueIter};
use http::HeaderMap;
#[cfg(feature = "http2")]
use std::fmt;

#[cfg(feature = "http2")]
/// Represents the `:protocol` pseudo-header used by
/// the [Extended CONNECT Protocol].
///
/// [Extended CONNECT Protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
#[derive(Clone, Eq, PartialEq)]
pub struct Protocol {
inner: h2::ext::Protocol,
}

#[cfg(feature = "http2")]
impl Protocol {
/// Converts a static string to a protocol name.
pub const fn from_static(value: &'static str) -> Self {
Self {
inner: h2::ext::Protocol::from_static(value),
}
}

/// Returns a str representation of the header.
pub fn as_str(&self) -> &str {
self.inner.as_str()
}

pub(crate) fn from_inner(inner: h2::ext::Protocol) -> Self {
Self { inner }
}

pub(crate) fn into_inner(self) -> h2::ext::Protocol {
self.inner
}
}

#[cfg(feature = "http2")]
impl<'a> From<&'a str> for Protocol {
fn from(value: &'a str) -> Self {
Self {
inner: h2::ext::Protocol::from(value),
}
}
}

#[cfg(feature = "http2")]
impl AsRef<[u8]> for Protocol {
fn as_ref(&self) -> &[u8] {
self.inner.as_ref()
}
}

#[cfg(feature = "http2")]
impl fmt::Debug for Protocol {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.inner.fmt(f)
}
}

/// A map from header names to their original casing as received in an HTTP message.
///
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Expand Up @@ -76,7 +76,7 @@ mod cfg;
mod common;
pub mod body;
mod error;
mod ext;
pub mod ext;
#[cfg(test)]
mod mock;
pub mod rt;
Expand Down
14 changes: 14 additions & 0 deletions src/proto/h2/client.rs
Expand Up @@ -14,6 +14,7 @@ use tracing::{debug, trace, warn};
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
use crate::body::HttpBody;
use crate::common::{exec::Exec, task, Future, Never, Pin, Poll};
use crate::ext::Protocol;
use crate::headers;
use crate::proto::h2::UpgradedSendStream;
use crate::proto::Dispatched;
Expand Down Expand Up @@ -204,6 +205,15 @@ where
req_rx: ClientRx<B>,
}

impl<B> ClientTask<B>
where
B: HttpBody + 'static,
{
pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
self.h2_tx.is_extended_connect_protocol_enabled()
}
}

impl<B> Future for ClientTask<B>
where
B: HttpBody + Send + 'static,
Expand Down Expand Up @@ -260,6 +270,10 @@ where
}
}

if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
req.extensions_mut().insert(protocol.into_inner());
}

let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
Ok(ok) => ok,
Err(err) => {
Expand Down
12 changes: 11 additions & 1 deletion src/proto/h2/server.rs
Expand Up @@ -15,6 +15,7 @@ use super::{ping, PipeToSendStream, SendBuf};
use crate::body::HttpBody;
use crate::common::exec::ConnStreamExec;
use crate::common::{date, task, Future, Pin, Poll};
use crate::ext::Protocol;
use crate::headers;
use crate::proto::h2::ping::Recorder;
use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
Expand All @@ -41,6 +42,7 @@ pub(crate) struct Config {
pub(crate) initial_conn_window_size: u32,
pub(crate) initial_stream_window_size: u32,
pub(crate) max_frame_size: u32,
pub(crate) enable_connect_protocol: bool,
pub(crate) max_concurrent_streams: Option<u32>,
#[cfg(feature = "runtime")]
pub(crate) keep_alive_interval: Option<Duration>,
Expand All @@ -56,6 +58,7 @@ impl Default for Config {
initial_conn_window_size: DEFAULT_CONN_WINDOW,
initial_stream_window_size: DEFAULT_STREAM_WINDOW,
max_frame_size: DEFAULT_MAX_FRAME_SIZE,
enable_connect_protocol: false,
max_concurrent_streams: None,
#[cfg(feature = "runtime")]
keep_alive_interval: None,
Expand Down Expand Up @@ -117,6 +120,9 @@ where
if let Some(max) = config.max_concurrent_streams {
builder.max_concurrent_streams(max);
}
if config.enable_connect_protocol {
builder.enable_connect_protocol();
}
let handshake = builder.handshake(io);

let bdp = if config.adaptive_window {
Expand Down Expand Up @@ -280,7 +286,7 @@ where

let is_connect = req.method() == Method::CONNECT;
let (mut parts, stream) = req.into_parts();
let (req, connect_parts) = if !is_connect {
let (mut req, connect_parts) = if !is_connect {
(
Request::from_parts(
parts,
Expand All @@ -307,6 +313,10 @@ where
)
};

if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
req.extensions_mut().insert(Protocol::from_inner(protocol));
}

let fut = H2Stream::new(service.call(req), connect_parts, respond);
exec.execute_h2stream(fut);
}
Expand Down
9 changes: 9 additions & 0 deletions src/server/conn.rs
Expand Up @@ -558,6 +558,15 @@ impl<E> Http<E> {
self
}

/// Enables the [extended CONNECT protocol].
///
/// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
#[cfg(feature = "http2")]
pub fn http2_enable_connect_protocol(&mut self) -> &mut Self {
self.h2_builder.enable_connect_protocol = true;
self
}

/// Set the maximum buffer size for the connection.
///
/// Default is ~400kb.
Expand Down
9 changes: 9 additions & 0 deletions src/server/server.rs
Expand Up @@ -453,6 +453,15 @@ impl<I, E> Builder<I, E> {
self
}

/// Enables the [extended CONNECT protocol].
///
/// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
#[cfg(feature = "http2")]
pub fn http2_enable_connect_protocol(mut self) -> Self {
self.protocol.http2_enable_connect_protocol();
self
}

/// Sets the `Executor` to deal with connection tasks.
///
/// Default is `tokio::spawn`.
Expand Down