Skip to content

Commit

Permalink
Change multipart stream API backwards compatibility hazards (#358)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 30, 2019
1 parent cbc8f47 commit da74792
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
23 changes: 20 additions & 3 deletions src/filters/multipart.rs
Expand Up @@ -8,6 +8,7 @@ use std::io::{Cursor, Read};
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::{Buf, Bytes};
use futures::{future, Stream};
use headers::ContentType;
use mime::Mime;
Expand Down Expand Up @@ -145,6 +146,20 @@ impl Part {
pub fn content_type(&self) -> Option<&str> {
self.content_type.as_ref().map(|s| &**s)
}

/// Asynchronously get some of the data for this `Part`.
pub async fn data(&mut self) -> Option<Result<impl Buf, crate::Error>> {
self.take_data()
}

/// Convert this `Part` into a `Stream` of `Buf`s.
pub fn stream(self) -> impl Stream<Item = Result<impl Buf, crate::Error>> {
PartStream(self)
}

fn take_data(&mut self) -> Option<Result<Bytes, crate::Error>> {
self.data.take().map(|vec| Ok(vec.into()))
}
}

impl fmt::Debug for Part {
Expand All @@ -164,10 +179,12 @@ impl fmt::Debug for Part {
}
}

impl Stream for Part {
type Item = Vec<u8>;
struct PartStream(Part);

impl Stream for PartStream {
type Item = Result<Bytes, crate::Error>;

fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
Poll::Ready((*self).data.take())
Poll::Ready(self.0.take_data())
}
}
13 changes: 9 additions & 4 deletions tests/multipart.rs
@@ -1,18 +1,23 @@
#![deny(warnings)]
use futures::{FutureExt, StreamExt, TryStreamExt};
use warp::Filter;
use bytes::BufMut;
use futures::{TryFutureExt, TryStreamExt};
use warp::{multipart, Filter};

#[tokio::test]
async fn form_fields() {
let _ = pretty_env_logger::try_init();

let route = warp::multipart::form().and_then(|form: warp::multipart::FormData| {
let route = multipart::form().and_then(|form: multipart::FormData| {
async {
// Collect the fields into (name, value): (String, Vec<u8>)
let part: Result<Vec<(String, Vec<u8>)>, warp::Rejection> = form
.and_then(|part| {
let name = part.name().to_string();
part.concat().map(move |value| Ok((name, value)))
let value = part.stream().try_fold(Vec::new(), |mut vec, data| {
vec.put(data);
async move { Ok(vec) }
});
value.map_ok(move |vec| (name, vec))
})
.try_collect()
.await
Expand Down

0 comments on commit da74792

Please sign in to comment.