From 207ecff1ab7c9700b7195766fca5756ad5f3b597 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 20 Dec 2019 15:26:01 -0800 Subject: [PATCH] Change a few `warp::body` filters - Adds `warp::body::bytes` to concatenate into a single `Bytes`. - Adds `warp::body::aggregate` to aggregate into some `impl Buf` without copying data. - Changes `warp::body::stream` to yield an `impl Stream`. - Removes `warp::body::concat`. - Removes `warp::body::BodyStream`. - Removes `warp::body::StreamBuf`. - Removes `warp::body::FullBody`. --- examples/sse_chat.rs | 6 +- src/filters/body.rs | 181 ++++++++++++++++----------------------- src/filters/multipart.rs | 6 +- tests/body.rs | 6 +- 4 files changed, 82 insertions(+), 117 deletions(-) diff --git a/examples/sse_chat.rs b/examples/sse_chat.rs index 3e6ffd89b..7c9a42cd8 100644 --- a/examples/sse_chat.rs +++ b/examples/sse_chat.rs @@ -5,7 +5,7 @@ use std::sync::{ Arc, Mutex, }; use tokio::sync::{mpsc, oneshot}; -use warp::{sse::ServerSentEvent, Buf, Filter}; +use warp::{sse::ServerSentEvent, Filter}; /// Our global unique user id counter. static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1); @@ -41,8 +41,8 @@ async fn main() { .and(warp::post()) .and(warp::path::param::()) .and(warp::body::content_length_limit(500)) - .and(warp::body::concat().and_then(|body: warp::body::FullBody| async move { - std::str::from_utf8(body.bytes()) + .and(warp::body::bytes().and_then(|body: bytes::Bytes| async move { + std::str::from_utf8(&body) .map(String::from) .map_err(|_e| warp::reject::custom(NotUtf8)) })) diff --git a/src/filters/body.rs b/src/filters/body.rs index b3f2c8128..98abf69b3 100644 --- a/src/filters/body.rs +++ b/src/filters/body.rs @@ -7,7 +7,7 @@ use std::fmt; use std::pin::Pin; use std::task::{Context, Poll}; -use bytes::{Bytes, Buf}; +use bytes::{Bytes, Buf, buf::BufExt}; use futures::{future, ready, TryFutureExt, Stream}; use headers::ContentLength; use http::header::CONTENT_TYPE; @@ -44,7 +44,7 @@ pub(crate) fn body() -> impl Filter + Copy /// /// // Limit the upload to 4kb... /// let upload = warp::body::content_length_limit(4096) -/// .and(warp::body::concat()); +/// .and(warp::body::aggregate()); /// ``` pub fn content_length_limit(limit: u64) -> impl Filter + Copy { crate::filters::header::header2() @@ -72,13 +72,18 @@ pub fn content_length_limit(limit: u64) -> impl Filter impl Filter + Copy { +pub fn stream() -> impl Filter>,), Error = Rejection> + Copy { body().map(|body: Body| BodyStream { body }) } + /// Returns a `Filter` that matches any request and extracts a `Future` of a /// concatenated body. /// +/// The contents of the body will be flattened into a single contiguous +/// `Bytes`, which may require memory copies. If you don't require a +/// contiguous buffer, using `aggregate` can be give better performance. +/// /// # Warning /// /// This does not have a default size limit, it would be wise to use one to @@ -90,25 +95,55 @@ pub fn stream() -> impl Filter + Cop /// use warp::{Buf, Filter}; /// /// let route = warp::body::content_length_limit(1024 * 32) -/// .and(warp::body::concat()) -/// .map(|mut full_body: warp::body::FullBody| { -/// // FullBody is a `Buf`, which could have several non-contiguous -/// // slices of memory... -/// let mut remaining = full_body.remaining(); -/// while remaining != 0 { -/// println!("slice = {:?}", full_body.bytes()); -/// let cnt = full_body.bytes().len(); -/// full_body.advance(cnt); -/// remaining -= cnt; -/// } +/// .and(warp::body::bytes()) +/// .map(|bytes: bytes::Bytes| { +/// println!("bytes = {:?}", bytes); /// }); /// ``` -pub fn concat() -> impl Filter + Copy { - body().and_then(|body: ::hyper::Body| +pub fn bytes() -> impl Filter + Copy { + body().and_then(|body: hyper::Body| hyper::body::to_bytes(body) - .map_ok(|bytes| FullBody { bytes }) .map_err(|err| { - log::debug!("concat error: {}", err); + log::debug!("to_bytes error: {}", err); + reject::known(BodyReadError(err)) + }) + ) +} + +/// Returns a `Filter` that matches any request and extracts a `Future` of an +/// aggregated body. +/// +/// The `Buf` may contain multiple, non-contiguous buffers. This can be more +/// performant (by reducing copies) when receiving large bodies. +/// +/// # Warning +/// +/// This does not have a default size limit, it would be wise to use one to +/// prevent a overly large request from using too much memory. +/// +/// # Example +/// +/// ``` +/// use warp::{Buf, Filter}; +/// +/// fn full_body(mut body: impl Buf) { +/// // It could have several non-contiguous slices of memory... +/// while body.has_remaining() { +/// println!("slice = {:?}", body.bytes()); +/// let cnt = body.bytes().len(); +/// body.advance(cnt); +/// } +/// } +/// +/// let route = warp::body::content_length_limit(1024 * 32) +/// .and(warp::body::aggregate()) +/// .map(full_body); +/// ``` +pub fn aggregate() -> impl Filter + Copy { + body().and_then(|body: ::hyper::Body| + hyper::body::aggregate(body) + .map_err(|err| { + log::debug!("aggregate error: {}", err); reject::known(BodyReadError(err)) }) ) @@ -172,14 +207,16 @@ fn is_content_type( /// }); /// ``` pub fn json() -> impl Filter + Copy { - is_content_type(mime::APPLICATION, mime::JSON) - .and(concat()) - .and_then(|buf: FullBody| { - future::ready(serde_json::from_slice(&buf.bytes).map_err(|err| { - log::debug!("request json body error: {}", err); - reject::known(BodyDeserializeError { cause: err.into() }) - })) + async fn from_reader(buf: impl Buf) -> Result { + serde_json::from_reader(buf.reader()).map_err(|err| { + log::debug!("request json body error: {}", err); + reject::known(BodyDeserializeError { cause: err.into() }) }) + } + + is_content_type(mime::APPLICATION, mime::JSON) + .and(aggregate()) + .and_then(from_reader) } /// Returns a `Filter` that matches any request and extracts a @@ -207,72 +244,33 @@ pub fn json() -> impl Filter() -> impl Filter + Copy { - is_content_type(mime::APPLICATION, mime::WWW_FORM_URLENCODED) - .and(concat()) - .and_then(|buf: FullBody| { - future::ready(serde_urlencoded::from_bytes(&buf.bytes).map_err(|err| { - log::debug!("request form body error: {}", err); - reject::known(BodyDeserializeError { cause: err.into() }) - })) + async fn from_reader(buf: impl Buf) -> Result { + serde_urlencoded::from_reader(buf.reader()).map_err(|err| { + log::debug!("request form body error: {}", err); + reject::known(BodyDeserializeError { cause: err.into() }) }) -} - -/// The full contents of a request body. -/// -/// Extracted with the [`concat`](concat) filter. -/// -/// As this is a `Buf`, it could have several non-contiguous slices of memory. -#[derive(Debug)] -pub struct FullBody { - // By concealing how a full body (concat()) is represented, this can be - // improved to be a `Vec` or similar, thus reducing copies required - // in the common case. - bytes: Bytes, -} - -impl FullBody { - #[cfg(feature = "multipart")] - pub(super) fn into_bytes(self) -> Bytes { - self.bytes - } -} - -impl Buf for FullBody { - #[inline] - fn remaining(&self) -> usize { - self.bytes.remaining() } - #[inline] - fn bytes(&self) -> &[u8] { - self.bytes.bytes() - } - - #[inline] - fn advance(&mut self, cnt: usize) { - self.bytes.advance(cnt); - } + is_content_type(mime::APPLICATION, mime::WWW_FORM_URLENCODED) + .and(aggregate()) + .and_then(from_reader) } -/// An `impl Stream` representing the request body. -/// -/// Extracted via the `warp::body::stream` filter. -pub struct BodyStream { +struct BodyStream { body: Body, } impl Stream for BodyStream { - type Item = Result; + type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let opt_item: Option> = ready!(Pin::new(&mut self.get_mut().body).poll_next(cx)); + let opt_item = ready!(Pin::new(&mut self.get_mut().body).poll_next(cx)); match opt_item { None => Poll::Ready(None), Some(item) => { let stream_buf = item - .map_err(crate::Error::new) - .map(|chunk| StreamBuf { chunk }); + .map_err(crate::Error::new); Poll::Ready(Some(stream_buf)) } @@ -280,39 +278,6 @@ impl Stream for BodyStream { } } -impl fmt::Debug for BodyStream { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("BodyStream").finish() - } -} - -/// An `impl Buf` representing a chunk in a request body. -/// -/// Yielded by a `BodyStream`. -pub struct StreamBuf { - chunk: Bytes, -} - -impl Buf for StreamBuf { - fn remaining(&self) -> usize { - self.chunk.remaining() - } - - fn bytes(&self) -> &[u8] { - self.chunk.bytes() - } - - fn advance(&mut self, cnt: usize) { - self.chunk.advance(cnt); - } -} - -impl fmt::Debug for StreamBuf { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(&self.chunk, f) - } -} - // ===== Rejections ===== /// An error used in rejections when deserializing a request body fails. diff --git a/src/filters/multipart.rs b/src/filters/multipart.rs index a11a63263..fdfeb3f1f 100644 --- a/src/filters/multipart.rs +++ b/src/filters/multipart.rs @@ -84,9 +84,9 @@ impl FilterBase for FormOptions { let filt = super::body::content_length_limit(self.max_length) .and(boundary) - .and(super::body::concat()) - .map(|boundary, body: super::body::FullBody| FormData { - inner: Multipart::with_body(Cursor::new(body.into_bytes()), boundary), + .and(super::body::bytes()) + .map(|boundary, body| FormData { + inner: Multipart::with_body(Cursor::new(body), boundary), }); let fut = filt.filter(); diff --git a/tests/body.rs b/tests/body.rs index 1cb9c7695..e3ace66cf 100644 --- a/tests/body.rs +++ b/tests/body.rs @@ -8,7 +8,7 @@ use bytes::Buf; async fn matches() { let _ = pretty_env_logger::try_init(); - let concat = warp::body::concat(); + let concat = warp::body::bytes(); let req = warp::test::request().path("/nothing-matches-me"); @@ -26,7 +26,7 @@ async fn matches() { async fn server_error_if_taking_body_multiple_times() { let _ = pretty_env_logger::try_init(); - let concat = warp::body::concat(); + let concat = warp::body::bytes(); let double = concat.and(concat).map(|_, _| warp::reply()); let res = warp::test::request().reply(&double).await; @@ -188,7 +188,7 @@ async fn stream() { .await .expect("filter() stream"); - let bufs: Result, warp::Error> = body.try_collect().await; + let bufs: Result, warp::Error> = body.try_collect().await; let bufs = bufs.unwrap(); assert_eq!(bufs.len(), 1);