diff --git a/src/filters/multipart.rs b/src/filters/multipart.rs index b5c0b3569..2fbc4247a 100644 --- a/src/filters/multipart.rs +++ b/src/filters/multipart.rs @@ -8,6 +8,7 @@ use std::io::{Cursor, Read}; use std::pin::Pin; use std::task::{Context, Poll}; +use bytes::{Buf, Bytes}; use futures::{future, Stream}; use headers::ContentType; use mime::Mime; @@ -145,6 +146,20 @@ impl Part { pub fn content_type(&self) -> Option<&str> { self.content_type.as_ref().map(|s| &**s) } + + /// Asynchronously get some of the data for this `Part`. + pub async fn data(&mut self) -> Option> { + self.take_data() + } + + /// Convert this `Part` into a `Stream` of `Buf`s. + pub fn stream(self) -> impl Stream> { + PartStream(self) + } + + fn take_data(&mut self) -> Option> { + self.data.take().map(|vec| Ok(vec.into())) + } } impl fmt::Debug for Part { @@ -164,10 +179,12 @@ impl fmt::Debug for Part { } } -impl Stream for Part { - type Item = Vec; +struct PartStream(Part); + +impl Stream for PartStream { + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { - Poll::Ready((*self).data.take()) + Poll::Ready(self.0.take_data()) } } diff --git a/tests/multipart.rs b/tests/multipart.rs index 540f4d0e0..7702e7361 100644 --- a/tests/multipart.rs +++ b/tests/multipart.rs @@ -1,18 +1,23 @@ #![deny(warnings)] -use futures::{FutureExt, StreamExt, TryStreamExt}; -use warp::Filter; +use bytes::BufMut; +use futures::{TryFutureExt, TryStreamExt}; +use warp::{multipart, Filter}; #[tokio::test] async fn form_fields() { let _ = pretty_env_logger::try_init(); - let route = warp::multipart::form().and_then(|form: warp::multipart::FormData| { + let route = multipart::form().and_then(|form: multipart::FormData| { async { // Collect the fields into (name, value): (String, Vec) let part: Result)>, warp::Rejection> = form .and_then(|part| { let name = part.name().to_string(); - part.concat().map(move |value| Ok((name, value))) + let value = part.stream().try_fold(Vec::new(), |mut vec, data| { + vec.put(data); + async move { Ok(vec) } + }); + value.map_ok(move |vec| (name, vec)) }) .try_collect() .await