Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: move datagram to separate crate #199

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ members = [
"h3",
"h3-quinn",
"h3-webtransport",
"h3-datagram",

# Internal
"examples",
Expand Down
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
octets = "0.2.0"

tracing-tree = { version = "0.2" }
h3-datagram = { path = "../h3-datagram" }

[features]
tree = []
Expand Down
5 changes: 4 additions & 1 deletion examples/webtransport_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use bytes::{BufMut, Bytes, BytesMut};
use h3::{
error::ErrorLevel,
ext::Protocol,
quic::{self, RecvDatagramExt, SendDatagramExt, SendStreamUnframed},
quic::{self, SendStreamUnframed},
server::Connection,
};
use h3_datagram::quic_traits::{RecvDatagramExt, SendDatagramExt};
use h3_quinn::quinn;
use h3_webtransport::{
server::{self, WebTransportSession},
Expand Down Expand Up @@ -294,6 +295,8 @@ where
stream::SendStream<C::SendStream, Bytes>: AsyncWrite,
C::BidiStream: SendStreamUnframed<Bytes>,
C::SendStream: SendStreamUnframed<Bytes>,
<C as RecvDatagramExt>::Error: h3::quic::Error,
<C as SendDatagramExt<Bytes>>::Error: h3::quic::Error,
{
let session_id = session.session_id();

Expand Down
9 changes: 9 additions & 0 deletions h3-datagram/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "h3-datagram"
version = "0.0.1"
edition = "2021"

[dependencies]
h3 = { path = "../h3" }
bytes = "1.4"
pin-project-lite = { version = "0.2", default_features = false }
8 changes: 8 additions & 0 deletions h3-datagram/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# H3 Datagram

this crate provides an implementation of the [h3-datagram](https://datatracker.ietf.org/doc/html/rfc9297) spec that works with the h3 crate.

## Usage
As stated in the [rfc](https://datatracker.ietf.org/doc/html/rfc9297#abstract) this is intended to be used for protocol extensions like [Web-Transport](https://datatracker.ietf.org/doc/draft-ietf-webtrans-http3/) and not directly by applications.

> HTTP Datagrams and the Capsule Protocol are intended for use by HTTP extensions, not applications.
69 changes: 69 additions & 0 deletions h3-datagram/src/datagram.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use bytes::{Buf, Bytes};
use h3::{error::Code, proto::varint::VarInt, quic::StreamId, Error};

/// HTTP datagram frames
/// See: <https://www.rfc-editor.org/rfc/rfc9297#section-2.1>
pub struct Datagram<B = Bytes> {
/// Stream id divided by 4
stream_id: StreamId,
/// The data contained in the datagram
payload: B,
}

impl<B> Datagram<B>
where
B: Buf,
{
/// Creates a new datagram frame
pub fn new(stream_id: StreamId, payload: B) -> Self {
assert!(
stream_id.into_inner() % 4 == 0,
"StreamId is not divisible by 4"
);
Self { stream_id, payload }
}

/// Decodes a datagram frame from the QUIC datagram
pub fn decode(mut buf: B) -> Result<Self, Error> {
let q_stream_id = VarInt::decode(&mut buf)
.map_err(|_| Code::H3_DATAGRAM_ERROR.with_cause("Malformed datagram frame"))?;

//= https://www.rfc-editor.org/rfc/rfc9297#section-2.1
// Quarter Stream ID: A variable-length integer that contains the value of the client-initiated bidirectional
// stream that this datagram is associated with divided by four (the division by four stems
// from the fact that HTTP requests are sent on client-initiated bidirectional streams,
// which have stream IDs that are divisible by four). The largest legal QUIC stream ID
// value is 262-1, so the largest legal value of the Quarter Stream ID field is 260-1.
// Receipt of an HTTP/3 Datagram that includes a larger value MUST be treated as an HTTP/3
// connection error of type H3_DATAGRAM_ERROR (0x33).
let stream_id = StreamId::try_from(u64::from(q_stream_id) * 4)
.map_err(|_| Code::H3_DATAGRAM_ERROR.with_cause("Invalid stream id"))?;

let payload = buf;

Ok(Self { stream_id, payload })
}

#[inline]
/// Returns the associated stream id of the datagram
pub fn stream_id(&self) -> StreamId {
self.stream_id
}

#[inline]
/// Returns the datagram payload
pub fn payload(&self) -> &B {
&self.payload
}

/// Encode the datagram to wire format
pub fn encode<D: bytes::BufMut>(self, buf: &mut D) {
(VarInt::from(self.stream_id) / 4).encode(buf);
buf.put(self.payload);
}

/// Returns the datagram payload
pub fn into_payload(self) -> B {
self.payload
}
}
21 changes: 21 additions & 0 deletions h3-datagram/src/datagram_traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//! Traits which define the user API for datagrams.
//! These traits are implemented for the client and server types in the `h3` crate.

use bytes::Buf;
use h3::{
quic::{self, StreamId},
Error,
};

use crate::server::ReadDatagram;

pub trait HandleDatagrams<C, B>
where
B: Buf,
C: quic::Connection<B>,
{
/// Sends a datagram
fn send_datagram(&mut self, stream_id: StreamId, data: B) -> Result<(), Error>;
/// Reads an incoming datagram
fn read_datagram(&mut self) -> ReadDatagram<C, B>;
}
4 changes: 4 additions & 0 deletions h3-datagram/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod datagram;
pub mod datagram_traits;
pub mod quic_traits;
pub mod server;
38 changes: 38 additions & 0 deletions h3-datagram/src/quic_traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
//! QUIC Transport traits
//!
//! This module includes traits and types meant to allow being generic over any
//! QUIC implementation.

use core::task;
use std::{error::Error, task::Poll};

use bytes::Buf;

use crate::datagram::Datagram;

/// Extends the `Connection` trait for sending datagrams
///
/// See: <https://www.rfc-editor.org/rfc/rfc9297>
pub trait SendDatagramExt<B: Buf> {
/// The error type that can occur when sending a datagram
type Error: Into<Box<dyn Error>>;

/// Send a datagram
fn send_datagram(&mut self, data: Datagram<B>) -> Result<(), Self::Error>;
}

/// Extends the `Connection` trait for receiving datagrams
///
/// See: <https://www.rfc-editor.org/rfc/rfc9297>
pub trait RecvDatagramExt {
/// The type of `Buf` for *raw* datagrams (without the stream_id decoded)
type Buf: Buf;
/// The error type that can occur when receiving a datagram
type Error: Into<Box<dyn Error>>;

/// Poll the connection for incoming datagrams.
fn poll_accept_datagram(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::Buf>, Self::Error>>;
}
78 changes: 78 additions & 0 deletions h3-datagram/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! server API

use std::{
future::Future,
marker::PhantomData,
task::{ready, Context, Poll},
};

use bytes::Buf;
use h3::{
quic::{self, StreamId},
server::Connection,
Error,
};
use pin_project_lite::pin_project;

use crate::{
datagram::Datagram,
datagram_traits::HandleDatagrams,
quic_traits::{self, RecvDatagramExt, SendDatagramExt},
};

impl<B, C> HandleDatagrams<C, B> for Connection<C, B>
where
B: Buf,
C: quic::Connection<B> + SendDatagramExt<B> + RecvDatagramExt,
<C as quic_traits::RecvDatagramExt>::Error: h3::quic::Error + 'static,
<C as quic_traits::SendDatagramExt<B>>::Error: h3::quic::Error + 'static,
{
/// Sends a datagram
fn send_datagram(&mut self, stream_id: StreamId, data: B) -> Result<(), Error> {
self.inner
.conn
.send_datagram(Datagram::new(stream_id, data))?;
//Todo Import tracing
//tracing::info!("Sent datagram");

Ok(())
}

/// Reads an incoming datagram
fn read_datagram(&mut self) -> ReadDatagram<C, B> {
ReadDatagram {
conn: self,
_marker: PhantomData,
}
}
}

pin_project! {
/// Future for [`Connection::read_datagram`]
pub struct ReadDatagram<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
{
conn: &'a mut Connection<C, B>,
_marker: PhantomData<B>,
}
}

impl<'a, C, B> Future for ReadDatagram<'a, C, B>
where
C: quic::Connection<B> + RecvDatagramExt,
<C as quic_traits::RecvDatagramExt>::Error: h3::quic::Error + 'static,
B: Buf,
{
type Output = Result<Option<Datagram<C::Buf>>, Error>;

fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
//Todo Import tracing
// tracing::trace!("poll: read_datagram");
match ready!(self.conn.inner.conn.poll_accept_datagram(cx))? {
Some(v) => Poll::Ready(Ok(Some(Datagram::decode(v)?))),
None => Poll::Ready(Ok(None)),
}
}
}
1 change: 1 addition & 0 deletions h3-quinn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ quinn-proto = { version = "0.10", default-features = false }
tokio-util = { version = "0.7.7" }
futures = { version = "0.3.27" }
tokio = { version = "1.28", features = ["io-util"], default-features = false }
h3-datagram = { path = "../h3-datagram" }
10 changes: 4 additions & 6 deletions h3-quinn/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ use futures::{
stream::{self, BoxStream},
StreamExt,
};
use h3_datagram::{datagram::Datagram, quic_traits};
use quinn::ReadDatagram;
pub use quinn::{
self, crypto::Session, AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt, WriteError,
};

use h3::{
ext::Datagram,
quic::{self, Error, StreamId, WriteBuf},
};
use h3::quic::{self, Error, StreamId, WriteBuf};
use tokio_util::sync::ReusableBoxFuture;

/// A QUIC connection backed by Quinn
Expand Down Expand Up @@ -233,7 +231,7 @@ where
}
}

impl<B> quic::SendDatagramExt<B> for Connection
impl<B> quic_traits::SendDatagramExt<B> for Connection
where
B: Buf,
{
Expand All @@ -249,7 +247,7 @@ where
}
}

impl quic::RecvDatagramExt for Connection {
impl quic_traits::RecvDatagramExt for Connection {
type Buf = Bytes;

type Error = ConnectionError;
Expand Down
1 change: 1 addition & 0 deletions h3-webtransport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ http = "0.2.9"
pin-project-lite = { version = "0.2", default_features = false }
tracing = "0.1.37"
tokio = { version = "1.28", default_features = false }
h3-datagram = { path = "../h3-datagram" }

[dependencies.h3]
version = "0.0.2"
Expand Down
12 changes: 10 additions & 2 deletions h3-webtransport/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,22 @@ use futures_util::{future::poll_fn, ready, Future};
use h3::{
connection::ConnectionState,
error::{Code, ErrorLevel},
ext::{Datagram, Protocol},
ext::Protocol,
frame::FrameStream,
proto::frame::Frame,
quic::{self, OpenStreams, RecvDatagramExt, SendDatagramExt, WriteBuf},
quic::{self, OpenStreams, WriteBuf},
server::{self, Connection, RequestStream},
Error,
};
use h3::{
quic::SendStreamUnframed,
stream::{BidiStreamHeader, BufRecvStream, UniStreamHeader},
};
use h3_datagram::{
datagram::Datagram,
datagram_traits::HandleDatagrams,
quic_traits::{RecvDatagramExt, SendDatagramExt},
};
use http::{Method, Request, Response, StatusCode};

use h3::webtransport::SessionId;
Expand All @@ -38,6 +43,7 @@ use crate::stream::{BidiStream, RecvStream, SendStream};
pub struct WebTransportSession<C, B>
where
C: quic::Connection<B>,
Connection<C, B>: HandleDatagrams<C, B>,
B: Buf,
{
// See: https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-2-3
Expand All @@ -50,6 +56,7 @@ where

impl<C, B> WebTransportSession<C, B>
where
Connection<C, B>: HandleDatagrams<C, B>,
C: quic::Connection<B>,
B: Buf,
{
Expand Down Expand Up @@ -373,6 +380,7 @@ impl<'a, C, B> Future for ReadDatagram<'a, C, B>
where
C: quic::Connection<B> + RecvDatagramExt,
B: Buf,
<C as RecvDatagramExt>::Error: h3::quic::Error + 'static,
{
type Output = Result<Option<(SessionId, C::Buf)>, Error>;

Expand Down
1 change: 1 addition & 0 deletions h3/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,4 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [
] }
futures = { version = "0.3.27" }
tokio-util = { version = "0.7.7" }
h3-datagram = {path = "../h3-datagram" }
4 changes: 3 additions & 1 deletion h3/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ impl Code {
})
}

pub(crate) fn with_cause<E: Into<Cause>>(self, cause: E) -> Error {
// Todo: public? private?
#[doc(hidden)]
pub fn with_cause<E: Into<Cause>>(self, cause: E) -> Error {
Error::from(self).with_cause(cause)
}

Expand Down