Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http1): add support for receiving trailer fields #3637

Merged
merged 11 commits into from
May 13, 2024
13 changes: 13 additions & 0 deletions src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,19 @@ impl Sender {
.map_err(|err| err.into_inner().expect("just sent Ok"))
}

#[cfg(feature = "http1")]
pub(crate) fn try_send_trailers(
&mut self,
trailers: HeaderMap,
) -> Result<(), Option<HeaderMap>> {
let tx = match self.trailers_tx.take() {
Some(tx) => tx,
None => return Err(None),
};

tx.send(trailers).map_err(|err| Some(err))
}

#[cfg(test)]
pub(crate) fn abort(mut self) {
self.send_error(crate::Error::new_body_write_aborted());
Expand Down
65 changes: 42 additions & 23 deletions src/proto/h1/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use bytes::{Buf, Bytes};
use futures_util::ready;
use http::header::{HeaderValue, CONNECTION, TE};
use http::{HeaderMap, Method, Version};
use http_body::Frame;
use httparse::ParserConfig;

use super::io::Buffered;
Expand Down Expand Up @@ -268,10 +269,20 @@ where
self.try_keep_alive(cx);
}
} else if msg.expect_continue && msg.head.version.gt(&Version::HTTP_10) {
self.state.reading = Reading::Continue(Decoder::new(msg.decode));
let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
self.state.reading = Reading::Continue(Decoder::new(
msg.decode,
self.state.h1_max_headers,
h1_max_header_size,
));
wants = wants.add(Wants::EXPECT);
} else {
self.state.reading = Reading::Body(Decoder::new(msg.decode));
let h1_max_header_size = None; // TODO: remove this when we land h1_max_header_size support
self.state.reading = Reading::Body(Decoder::new(
msg.decode,
self.state.h1_max_headers,
h1_max_header_size,
));
}

self.state.allow_trailer_fields = msg
Expand Down Expand Up @@ -312,33 +323,41 @@ where
pub(crate) fn poll_read_body(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<Bytes>>> {
) -> Poll<Option<io::Result<Frame<Bytes>>>> {
debug_assert!(self.can_read_body());

let (reading, ret) = match self.state.reading {
Reading::Body(ref mut decoder) => {
match ready!(decoder.decode(cx, &mut self.io)) {
Ok(slice) => {
let (reading, chunk) = if decoder.is_eof() {
debug!("incoming body completed");
(
Reading::KeepAlive,
if !slice.is_empty() {
Some(Ok(slice))
} else {
None
},
)
} else if slice.is_empty() {
error!("incoming body unexpectedly ended");
// This should be unreachable, since all 3 decoders
// either set eof=true or return an Err when reading
// an empty slice...
(Reading::Closed, None)
Ok(frame) => {
if frame.is_data() {
let slice = frame.data_ref().unwrap_or_else(|| unreachable!());
let (reading, maybe_frame) = if decoder.is_eof() {
debug!("incoming body completed");
(
Reading::KeepAlive,
if !slice.is_empty() {
Some(Ok(frame))
} else {
None
},
)
} else if slice.is_empty() {
error!("incoming body unexpectedly ended");
// This should be unreachable, since all 3 decoders
// either set eof=true or return an Err when reading
// an empty slice...
(Reading::Closed, None)
} else {
return Poll::Ready(Some(Ok(frame)));
};
(reading, Poll::Ready(maybe_frame))
} else if frame.is_trailers() {
(Reading::Closed, Poll::Ready(Some(Ok(frame))))
} else {
return Poll::Ready(Some(Ok(slice)));
};
(reading, Poll::Ready(chunk))
trace!("discarding unknown frame");
(Reading::Closed, Poll::Ready(None))
}
}
Err(e) => {
debug!("incoming body decode error: {}", e);
Expand Down