Skip to content

Commit

Permalink
use multiparty crate (#1031)
Browse files Browse the repository at this point in the history
Co-authored-by: Paolo Barbolini <paolo@paolo565.org>
  • Loading branch information
seanmonstar and paolobarbolini committed Mar 31, 2023
1 parent 6ae9520 commit 2fecf97
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 39 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ hyper = { version = "0.14", features = ["stream", "server", "http1", "http2", "t
log = "0.4"
mime = "0.3"
mime_guess = "2.0.0"
multipart = { version = "0.18", default-features = false, features = ["server"], optional = true }
multiparty = { version = "0.1", features = ["server", "futures03"], optional = true }
scoped-tls = "1.0"
serde = "1.0"
serde_json = "1.0"
Expand Down Expand Up @@ -55,6 +55,7 @@ listenfd = "1.0"

[features]
default = ["multipart", "websocket"]
multipart = ["multiparty"]
websocket = ["tokio-tungstenite"]
tls = ["tokio-rustls"]

Expand Down
95 changes: 57 additions & 38 deletions src/filters/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,18 @@
//!
//! Filters that extract a multipart body for a route.

use std::fmt;
use std::future::Future;
use std::io::{Cursor, Read};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{fmt, io};

use bytes::{Buf, Bytes};
use futures_util::{future, Stream};
use headers::ContentType;
use hyper::Body;
use mime::Mime;
use multipart::server::Multipart;
use multiparty::headers::Headers;
use multiparty::server::owned_futures03::{FormData as FormDataInner, Part as PartInner};

use crate::filter::{Filter, FilterBase, Internal};
use crate::reject::{self, Rejection};
Expand All @@ -32,17 +33,15 @@ pub struct FormOptions {
///
/// Extracted with a `warp::multipart::form` filter.
pub struct FormData {
inner: Multipart<Cursor<::bytes::Bytes>>,
inner: FormDataInner<BodyIoError>,
}

/// A single "part" of a multipart/form-data body.
///
/// Yielded from the `FormData` stream.
pub struct Part {
name: String,
filename: Option<String>,
content_type: Option<String>,
data: Option<Vec<u8>>,
headers: Headers,
part: PartInner<BodyIoError>,
}

/// Create a `Filter` to extract a `multipart/form-data` body from a request.
Expand Down Expand Up @@ -86,9 +85,12 @@ impl FilterBase for FormOptions {

let filt = super::body::content_length_limit(self.max_length)
.and(boundary)
.and(super::body::bytes())
.map(|boundary, body| FormData {
inner: Multipart::with_body(Cursor::new(body), boundary),
.and(super::body::body())
.map(|boundary: String, body| {
let body = BodyIoError(body);
FormData {
inner: FormDataInner::new(body, &boundary),
}
});

let fut = filt.filter(Internal);
Expand All @@ -108,23 +110,18 @@ impl fmt::Debug for FormData {
impl Stream for FormData {
type Item = Result<Part, crate::Error>;

fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match (*self).inner.read_entry() {
Ok(Some(mut field)) => {
let mut data = Vec::new();
field
.data
.read_to_end(&mut data)
.map_err(crate::Error::new)?;
Poll::Ready(Some(Ok(Part {
name: field.headers.name.to_string(),
filename: field.headers.filename,
content_type: field.headers.content_type.map(|m| m.to_string()),
data: Some(data),
})))
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Ok(part))) => {
let headers = match part.raw_headers().parse() {
Ok(headers) => headers,
Err(err) => return Poll::Ready(Some(Err(crate::Error::new(err)))),
};
Poll::Ready(Some(Ok(Part { part, headers })))
}
Ok(None) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(crate::Error::new(e)))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(crate::Error::new(err)))),
}
}
}
Expand All @@ -134,44 +131,49 @@ impl Stream for FormData {
impl Part {
/// Get the name of this part.
pub fn name(&self) -> &str {
&self.name
&self.headers.name
}

/// Get the filename of this part, if present.
pub fn filename(&self) -> Option<&str> {
self.filename.as_deref()
self.headers.filename.as_deref()
}

/// Get the content-type of this part, if present.
pub fn content_type(&self) -> Option<&str> {
self.content_type.as_deref()
self.headers.content_type.as_deref()
}

/// Asynchronously get some of the data for this `Part`.
pub async fn data(&mut self) -> Option<Result<impl Buf, crate::Error>> {
self.take_data()
future::poll_fn(|cx| self.poll_next(cx)).await
}

/// Convert this `Part` into a `Stream` of `Buf`s.
pub fn stream(self) -> impl Stream<Item = Result<impl Buf, crate::Error>> {
PartStream(self)
}

fn take_data(&mut self) -> Option<Result<Bytes, crate::Error>> {
self.data.take().map(|vec| Ok(vec.into()))
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<Bytes, crate::Error>>> {
match Pin::new(&mut self.part).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Ok(bytes))) => Poll::Ready(Some(Ok(bytes))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(crate::Error::new(err)))),
}
}
}

impl fmt::Debug for Part {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut builder = f.debug_struct("Part");
builder.field("name", &self.name);
builder.field("name", &self.headers.name);

if let Some(ref filename) = self.filename {
if let Some(ref filename) = self.headers.filename {
builder.field("filename", filename);
}

if let Some(ref mime) = self.content_type {
if let Some(ref mime) = self.headers.content_type {
builder.field("content_type", mime);
}

Expand All @@ -184,7 +186,24 @@ struct PartStream(Part);
impl Stream for PartStream {
type Item = Result<Bytes, crate::Error>;

fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.0.take_data())
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.0.poll_next(cx)
}
}

struct BodyIoError(Body);

impl Stream for BodyIoError {
type Item = io::Result<Bytes>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.0).poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(Ok(bytes))) => Poll::Ready(Some(Ok(bytes))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Err(err))) => {
Poll::Ready(Some(Err(io::Error::new(io::ErrorKind::Other, err))))
}
}
}
}

0 comments on commit 2fecf97

Please sign in to comment.