Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change a few warp::body filters #345

Merged
merged 1 commit into from Dec 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/sse_chat.rs
Expand Up @@ -5,7 +5,7 @@ use std::sync::{
Arc, Mutex,
};
use tokio::sync::{mpsc, oneshot};
use warp::{sse::ServerSentEvent, Buf, Filter};
use warp::{sse::ServerSentEvent, Filter};

/// Our global unique user id counter.
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
Expand Down Expand Up @@ -41,8 +41,8 @@ async fn main() {
.and(warp::post())
.and(warp::path::param::<usize>())
.and(warp::body::content_length_limit(500))
.and(warp::body::concat().and_then(|body: warp::body::FullBody| async move {
std::str::from_utf8(body.bytes())
.and(warp::body::bytes().and_then(|body: bytes::Bytes| async move {
std::str::from_utf8(&body)
.map(String::from)
.map_err(|_e| warp::reject::custom(NotUtf8))
}))
Expand Down
181 changes: 73 additions & 108 deletions src/filters/body.rs
Expand Up @@ -7,7 +7,7 @@ use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::{Bytes, Buf};
use bytes::{Bytes, Buf, buf::BufExt};
use futures::{future, ready, TryFutureExt, Stream};
use headers::ContentLength;
use http::header::CONTENT_TYPE;
Expand Down Expand Up @@ -44,7 +44,7 @@ pub(crate) fn body() -> impl Filter<Extract = (Body,), Error = Rejection> + Copy
///
/// // Limit the upload to 4kb...
/// let upload = warp::body::content_length_limit(4096)
/// .and(warp::body::concat());
/// .and(warp::body::aggregate());
/// ```
pub fn content_length_limit(limit: u64) -> impl Filter<Extract = (), Error = Rejection> + Copy {
crate::filters::header::header2()
Expand Down Expand Up @@ -72,13 +72,18 @@ pub fn content_length_limit(limit: u64) -> impl Filter<Extract = (), Error = Rej
///
/// This does not have a default size limit, it would be wise to use one to
/// prevent a overly large request from using too much memory.
pub fn stream() -> impl Filter<Extract = (BodyStream,), Error = Rejection> + Copy {
pub fn stream() -> impl Filter<Extract = (impl Stream<Item = Result<impl Buf, crate::Error>>,), Error = Rejection> + Copy {
body().map(|body: Body| BodyStream { body })
}


/// Returns a `Filter` that matches any request and extracts a `Future` of a
/// concatenated body.
///
/// The contents of the body will be flattened into a single contiguous
/// `Bytes`, which may require memory copies. If you don't require a
/// contiguous buffer, using `aggregate` can be give better performance.
///
/// # Warning
///
/// This does not have a default size limit, it would be wise to use one to
Expand All @@ -90,25 +95,55 @@ pub fn stream() -> impl Filter<Extract = (BodyStream,), Error = Rejection> + Cop
/// use warp::{Buf, Filter};
///
/// let route = warp::body::content_length_limit(1024 * 32)
/// .and(warp::body::concat())
/// .map(|mut full_body: warp::body::FullBody| {
/// // FullBody is a `Buf`, which could have several non-contiguous
/// // slices of memory...
/// let mut remaining = full_body.remaining();
/// while remaining != 0 {
/// println!("slice = {:?}", full_body.bytes());
/// let cnt = full_body.bytes().len();
/// full_body.advance(cnt);
/// remaining -= cnt;
/// }
/// .and(warp::body::bytes())
/// .map(|bytes: bytes::Bytes| {
/// println!("bytes = {:?}", bytes);
/// });
/// ```
pub fn concat() -> impl Filter<Extract = (FullBody,), Error = Rejection> + Copy {
body().and_then(|body: ::hyper::Body|
pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
body().and_then(|body: hyper::Body|
hyper::body::to_bytes(body)
.map_ok(|bytes| FullBody { bytes })
.map_err(|err| {
log::debug!("concat error: {}", err);
log::debug!("to_bytes error: {}", err);
reject::known(BodyReadError(err))
})
)
}

/// Returns a `Filter` that matches any request and extracts a `Future` of an
/// aggregated body.
///
/// The `Buf` may contain multiple, non-contiguous buffers. This can be more
/// performant (by reducing copies) when receiving large bodies.
///
/// # Warning
///
/// This does not have a default size limit, it would be wise to use one to
/// prevent a overly large request from using too much memory.
///
/// # Example
///
/// ```
/// use warp::{Buf, Filter};
///
/// fn full_body(mut body: impl Buf) {
/// // It could have several non-contiguous slices of memory...
/// while body.has_remaining() {
/// println!("slice = {:?}", body.bytes());
/// let cnt = body.bytes().len();
/// body.advance(cnt);
/// }
/// }
///
/// let route = warp::body::content_length_limit(1024 * 32)
/// .and(warp::body::aggregate())
/// .map(full_body);
/// ```
pub fn aggregate() -> impl Filter<Extract = (impl Buf,), Error = Rejection> + Copy {
body().and_then(|body: ::hyper::Body|
hyper::body::aggregate(body)
.map_err(|err| {
log::debug!("aggregate error: {}", err);
reject::known(BodyReadError(err))
})
)
Expand Down Expand Up @@ -172,14 +207,16 @@ fn is_content_type(
/// });
/// ```
pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
is_content_type(mime::APPLICATION, mime::JSON)
.and(concat())
.and_then(|buf: FullBody| {
future::ready(serde_json::from_slice(&buf.bytes).map_err(|err| {
log::debug!("request json body error: {}", err);
reject::known(BodyDeserializeError { cause: err.into() })
}))
async fn from_reader<T: DeserializeOwned + Send>(buf: impl Buf) -> Result<T, Rejection> {
serde_json::from_reader(buf.reader()).map_err(|err| {
log::debug!("request json body error: {}", err);
reject::known(BodyDeserializeError { cause: err.into() })
})
}

is_content_type(mime::APPLICATION, mime::JSON)
.and(aggregate())
.and_then(from_reader)
}

/// Returns a `Filter` that matches any request and extracts a
Expand Down Expand Up @@ -207,112 +244,40 @@ pub fn json<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error =
/// });
/// ```
pub fn form<T: DeserializeOwned + Send>() -> impl Filter<Extract = (T,), Error = Rejection> + Copy {
is_content_type(mime::APPLICATION, mime::WWW_FORM_URLENCODED)
.and(concat())
.and_then(|buf: FullBody| {
future::ready(serde_urlencoded::from_bytes(&buf.bytes).map_err(|err| {
log::debug!("request form body error: {}", err);
reject::known(BodyDeserializeError { cause: err.into() })
}))
async fn from_reader<T: DeserializeOwned + Send>(buf: impl Buf) -> Result<T, Rejection> {
serde_urlencoded::from_reader(buf.reader()).map_err(|err| {
log::debug!("request form body error: {}", err);
reject::known(BodyDeserializeError { cause: err.into() })
})
}

/// The full contents of a request body.
///
/// Extracted with the [`concat`](concat) filter.
///
/// As this is a `Buf`, it could have several non-contiguous slices of memory.
#[derive(Debug)]
pub struct FullBody {
// By concealing how a full body (concat()) is represented, this can be
// improved to be a `Vec<Chunk>` or similar, thus reducing copies required
// in the common case.
bytes: Bytes,
}

impl FullBody {
#[cfg(feature = "multipart")]
pub(super) fn into_bytes(self) -> Bytes {
self.bytes
}
}

impl Buf for FullBody {
#[inline]
fn remaining(&self) -> usize {
self.bytes.remaining()
}

#[inline]
fn bytes(&self) -> &[u8] {
self.bytes.bytes()
}

#[inline]
fn advance(&mut self, cnt: usize) {
self.bytes.advance(cnt);
}
is_content_type(mime::APPLICATION, mime::WWW_FORM_URLENCODED)
.and(aggregate())
.and_then(from_reader)
}

/// An `impl Stream` representing the request body.
///
/// Extracted via the `warp::body::stream` filter.
pub struct BodyStream {
struct BodyStream {
body: Body,
}

impl Stream for BodyStream {
type Item = Result<StreamBuf, crate::Error>;
type Item = Result<Bytes, crate::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let opt_item: Option<Result<Bytes, hyper::Error>> = ready!(Pin::new(&mut self.get_mut().body).poll_next(cx));
let opt_item = ready!(Pin::new(&mut self.get_mut().body).poll_next(cx));

match opt_item {
None => Poll::Ready(None),
Some(item) => {
let stream_buf = item
.map_err(crate::Error::new)
.map(|chunk| StreamBuf { chunk });
.map_err(crate::Error::new);

Poll::Ready(Some(stream_buf))
}
}
}
}

impl fmt::Debug for BodyStream {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("BodyStream").finish()
}
}

/// An `impl Buf` representing a chunk in a request body.
///
/// Yielded by a `BodyStream`.
pub struct StreamBuf {
chunk: Bytes,
}

impl Buf for StreamBuf {
fn remaining(&self) -> usize {
self.chunk.remaining()
}

fn bytes(&self) -> &[u8] {
self.chunk.bytes()
}

fn advance(&mut self, cnt: usize) {
self.chunk.advance(cnt);
}
}

impl fmt::Debug for StreamBuf {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.chunk, f)
}
}

// ===== Rejections =====

/// An error used in rejections when deserializing a request body fails.
Expand Down
6 changes: 3 additions & 3 deletions src/filters/multipart.rs
Expand Up @@ -84,9 +84,9 @@ impl FilterBase for FormOptions {

let filt = super::body::content_length_limit(self.max_length)
.and(boundary)
.and(super::body::concat())
.map(|boundary, body: super::body::FullBody| FormData {
inner: Multipart::with_body(Cursor::new(body.into_bytes()), boundary),
.and(super::body::bytes())
.map(|boundary, body| FormData {
inner: Multipart::with_body(Cursor::new(body), boundary),
});

let fut = filt.filter();
Expand Down
6 changes: 3 additions & 3 deletions tests/body.rs
Expand Up @@ -8,7 +8,7 @@ use bytes::Buf;
async fn matches() {
let _ = pretty_env_logger::try_init();

let concat = warp::body::concat();
let concat = warp::body::bytes();

let req = warp::test::request().path("/nothing-matches-me");

Expand All @@ -26,7 +26,7 @@ async fn matches() {
async fn server_error_if_taking_body_multiple_times() {
let _ = pretty_env_logger::try_init();

let concat = warp::body::concat();
let concat = warp::body::bytes();
let double = concat.and(concat).map(|_, _| warp::reply());

let res = warp::test::request().reply(&double).await;
Expand Down Expand Up @@ -188,7 +188,7 @@ async fn stream() {
.await
.expect("filter() stream");

let bufs: Result<Vec<warp::filters::body::StreamBuf>, warp::Error> = body.try_collect().await;
let bufs: Result<Vec<_>, warp::Error> = body.try_collect().await;
let bufs = bufs.unwrap();

assert_eq!(bufs.len(), 1);
Expand Down