Skip to content

Commit

Permalink
Change a few warp::body filters (#345)
Browse files Browse the repository at this point in the history
- Adds `warp::body::bytes` to concatenate into a single `Bytes`.
- Adds `warp::body::aggregate` to aggregate into some `impl Buf` without
  copying data.
- Changes `warp::body::stream` to yield an `impl Stream`.
- Removes `warp::body::concat`.
- Removes `warp::body::BodyStream`.
- Removes `warp::body::StreamBuf`.
- Removes `warp::body::FullBody`.
  • Loading branch information
seanmonstar committed Dec 20, 2019
1 parent 5c7ed94 commit 0b2581f
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 117 deletions.
6 changes: 3 additions & 3 deletions examples/sse_chat.rs
Expand Up @@ -5,7 +5,7 @@ use std::sync::{
Arc, Mutex,
};
use tokio::sync::{mpsc, oneshot};
use warp::{sse::ServerSentEvent, Buf, Filter};
use warp::{sse::ServerSentEvent, Filter};

/// Our global unique user id counter.
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
Expand Down Expand Up @@ -41,8 +41,8 @@ async fn main() {
.and(warp::post())
.and(warp::path::param::<usize>())
.and(warp::body::content_length_limit(500))
.and(warp::body::concat().and_then(|body: warp::body::FullBody| async move {
std::str::from_utf8(body.bytes())
.and(warp::body::bytes().and_then(|body: bytes::Bytes| async move {
std::str::from_utf8(&body)
.map(String::from)
.map_err(|_e| warp::reject::custom(NotUtf8))
}))
Expand Down
181 changes: 73 additions & 108 deletions src/filters/body.rs
Expand Up @@ -7,7 +7,7 @@ use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::{Bytes, Buf};
use bytes::{Bytes, Buf, buf::BufExt};
use futures::{future, ready, TryFutureExt, Stream};
use headers::ContentLength;
use http::header::CONTENT_TYPE;
Expand Down Expand Up @@ -44,7 +44,7 @@ pub(crate) fn body() -> impl Filter<Extract = (Body,), Error = Rejection> + Copy
///
/// // Limit the upload to 4kb...
/// let upload = warp::body::content_length_limit(4096)
/// .and(warp::body::concat());
/// .and(warp::body::aggregate());
/// ```
pub fn content_length_limit(limit: u64) -> impl Filter<Extract = (), Error = Rejection> + Copy {
crate::filters::header::header2()
Expand Down Expand Up @@ -72,13 +72,18 @@ pub fn content_length_limit(limit: u64) -> impl Filter<Extract = (), Error = Rej
///
/// This does not have a default size limit, it would be wise to use one to
/// prevent a overly large request from using too much memory.
pub fn stream() -> impl Filter<Extract = (BodyStream,), Error = Rejection> + Copy {
pub fn stream() -> impl Filter<Extract = (impl Stream<Item = Result<impl Buf, crate::Error>>,), Error = Rejection> + Copy {
body().map(|body: Body| BodyStream { body })
}


/// Returns a `Filter` that matches any request and extracts a `Future` of a
/// concatenated body.
///
/// The contents of the body will be flattened into a single contiguous
/// `Bytes`, which may require memory copies. If you don't require a
/// contiguous buffer, using `aggregate` can be give better performance.
///
/// # Warning
///
/// This does not have a default size limit, it would be wise to use one to
Expand All @@ -90,25 +95,55 @@ pub fn stream() -> impl Filter<Extract = (BodyStream,), Error = Rejection> + Cop
/// use warp::{Buf, Filter};
///
/// let route = warp::body::content_length_limit(1024 * 32)
/// .and(warp::body::concat())
/// .map(|mut full_body: warp::body::FullBody| {
/// // FullBody is a `Buf`, which could have several non-contiguous
/// // slices of memory...
/// let mut remaining = full_body.remaining();
/// while remaining != 0 {
/// println!("slice = {:?}", full_body.bytes());
/// let cnt = full_body.bytes().len();
/// full_body.advance(cnt);
/// remaining -= cnt;
/// }
/// .and(warp::body::bytes())
/// .map(|bytes: bytes::Bytes| {
/// println!("bytes = {:?}", bytes);
/// });
/// ```
pub fn concat() -> impl Filter<Extract = (FullBody,), Error = Rejection> + Copy {
body().and_then(|body: ::hyper::Body|
pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
body().and_then(|body: hyper::Body|
hyper::body::to_bytes(body)
.map_ok(|bytes| FullBody { bytes })
.map_err(|err| {
log::debug!("concat error: {}", err);
log::debug!("to_bytes error: {}", err);
reject::known(BodyReadError(err))
})
)
}

/// Returns a `Filter` that matches any request and extracts a `Future` of an
/// aggregated body.
///
/// The `Buf` may contain multiple, non-contiguous buffers. This can be more
/// performant (by reducing copies) when receiving large bodies.
///
/// # Warning
///
/// This does not have a default size limit, it would be wise to use one to
/// prevent a overly large request from using too much memory.
///
/// # Example
///
/// ```
/// use warp::{Buf, Filter};
///
/// fn full_body(mut body: impl Buf) {
/// // It could have several non-contiguous slices of memory...
/// while body.has_remaining() {
/// println!("slice = {:?}", body.bytes());
/// let cnt = body.bytes().len();
/// body.advance(cnt);
/// }
/// }
///
/// let route = warp::body::content_length_limit(1024 * 32)
/// .and(warp::body::aggregate())
/// .map(full_body);
/// ```
pub fn aggregate() -> impl Filter<Extract = (impl Buf,), Error = Rejection> + Copy {
body().and_then(|body: ::hyper::Body|
hyper::body::aggregate(body)
.map_err(|err| {
log::debug!("aggregate error: {}", err);
reject::known(BodyReadError(err))
})
)
Expand Down Expand Up @@ -172,14 +207,16 @@ fn is_content_type(
/// });
/// ```
pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
is_content_type(mime::APPLICATION, mime::JSON)
.and(concat())
.and_then(|buf: FullBody| {
future::ready(serde_json::from_slice(&buf.bytes).map_err(|err| {
log::debug!("request json body error: {}", err);
reject::known(BodyDeserializeError { cause: err.into() })
}))
async fn from_reader<T: DeserializeOwned + Send>(buf: impl Buf) -> Result<T, Rejection> {
serde_json::from_reader(buf.reader()).map_err(|err| {
log::debug!("request json body error: {}", err);
reject::known(BodyDeserializeError { cause: err.into() })
})
}

is_content_type(mime::APPLICATION, mime::JSON)
.and(aggregate())
.and_then(from_reader)
}

/// Returns a `Filter` that matches any request and extracts a
Expand Down Expand Up @@ -207,112 +244,40 @@ pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error =
/// });
/// ```
pub fn form<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
is_content_type(mime::APPLICATION, mime::WWW_FORM_URLENCODED)
.and(concat())
.and_then(|buf: FullBody| {
future::ready(serde_urlencoded::from_bytes(&buf.bytes).map_err(|err| {
log::debug!("request form body error: {}", err);
reject::known(BodyDeserializeError { cause: err.into() })
}))
async fn from_reader<T: DeserializeOwned + Send>(buf: impl Buf) -> Result<T, Rejection> {
serde_urlencoded::from_reader(buf.reader()).map_err(|err| {
log::debug!("request form body error: {}", err);
reject::known(BodyDeserializeError { cause: err.into() })
})
}

/// The full contents of a request body.
///
/// Extracted with the [`concat`](concat) filter.
///
/// As this is a `Buf`, it could have several non-contiguous slices of memory.
#[derive(Debug)]
pub struct FullBody {
// By concealing how a full body (concat()) is represented, this can be
// improved to be a `Vec<Chunk>` or similar, thus reducing copies required
// in the common case.
bytes: Bytes,
}

impl FullBody {
#[cfg(feature = "multipart")]
pub(super) fn into_bytes(self) -> Bytes {
self.bytes
}
}

impl Buf for FullBody {
#[inline]
fn remaining(&self) -> usize {
self.bytes.remaining()
}

#[inline]
fn bytes(&self) -> &[u8] {
self.bytes.bytes()
}

#[inline]
fn advance(&mut self, cnt: usize) {
self.bytes.advance(cnt);
}
is_content_type(mime::APPLICATION, mime::WWW_FORM_URLENCODED)
.and(aggregate())
.and_then(from_reader)
}

/// An `impl Stream` representing the request body.
///
/// Extracted via the `warp::body::stream` filter.
pub struct BodyStream {
struct BodyStream {
body: Body,
}

impl Stream for BodyStream {
type Item = Result<StreamBuf, crate::Error>;
type Item = Result<Bytes, crate::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let opt_item: Option<Result<Bytes, hyper::Error>> = ready!(Pin::new(&mut self.get_mut().body).poll_next(cx));
let opt_item = ready!(Pin::new(&mut self.get_mut().body).poll_next(cx));

match opt_item {
None => Poll::Ready(None),
Some(item) => {
let stream_buf = item
.map_err(crate::Error::new)
.map(|chunk| StreamBuf { chunk });
.map_err(crate::Error::new);

Poll::Ready(Some(stream_buf))
}
}
}
}

impl fmt::Debug for BodyStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BodyStream").finish()
}
}

/// An `impl Buf` representing a chunk in a request body.
///
/// Yielded by a `BodyStream`.
pub struct StreamBuf {
chunk: Bytes,
}

impl Buf for StreamBuf {
fn remaining(&self) -> usize {
self.chunk.remaining()
}

fn bytes(&self) -> &[u8] {
self.chunk.bytes()
}

fn advance(&mut self, cnt: usize) {
self.chunk.advance(cnt);
}
}

impl fmt::Debug for StreamBuf {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.chunk, f)
}
}

// ===== Rejections =====

/// An error used in rejections when deserializing a request body fails.
Expand Down
6 changes: 3 additions & 3 deletions src/filters/multipart.rs
Expand Up @@ -84,9 +84,9 @@ impl FilterBase for FormOptions {

let filt = super::body::content_length_limit(self.max_length)
.and(boundary)
.and(super::body::concat())
.map(|boundary, body: super::body::FullBody| FormData {
inner: Multipart::with_body(Cursor::new(body.into_bytes()), boundary),
.and(super::body::bytes())
.map(|boundary, body| FormData {
inner: Multipart::with_body(Cursor::new(body), boundary),
});

let fut = filt.filter();
Expand Down
6 changes: 3 additions & 3 deletions tests/body.rs
Expand Up @@ -8,7 +8,7 @@ use bytes::Buf;
async fn matches() {
let _ = pretty_env_logger::try_init();

let concat = warp::body::concat();
let concat = warp::body::bytes();

let req = warp::test::request().path("/nothing-matches-me");

Expand All @@ -26,7 +26,7 @@ async fn matches() {
async fn server_error_if_taking_body_multiple_times() {
let _ = pretty_env_logger::try_init();

let concat = warp::body::concat();
let concat = warp::body::bytes();
let double = concat.and(concat).map(|_, _| warp::reply());

let res = warp::test::request().reply(&double).await;
Expand Down Expand Up @@ -188,7 +188,7 @@ async fn stream() {
.await
.expect("filter() stream");

let bufs: Result<Vec<warp::filters::body::StreamBuf>, warp::Error> = body.try_collect().await;
let bufs: Result<Vec<_>, warp::Error> = body.try_collect().await;
let bufs = bufs.unwrap();

assert_eq!(bufs.len(), 1);
Expand Down

0 comments on commit 0b2581f

Please sign in to comment.