Skip to content

Commit

Permalink
Mnior cleanup of multipart API. (#2461)
Browse files Browse the repository at this point in the history
  • Loading branch information
fakeshadow committed Nov 24, 2021
1 parent 3e6e977 commit 52bbbd1
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 28 deletions.
2 changes: 1 addition & 1 deletion actix-multipart/Cargo.toml
Expand Up @@ -20,7 +20,6 @@ actix-utils = "3.0.0"
bytes = "1"
derive_more = "0.99.5"
futures-core = { version = "0.3.7", default-features = false, features = ["alloc"] }
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
httparse = "1.3"
local-waker = "0.1"
log = "0.4"
Expand All @@ -30,5 +29,6 @@ twoway = "0.2"
[dev-dependencies]
actix-rt = "2.2"
actix-http = "3.0.0-beta.13"
futures-util = { version = "0.3.7", default-features = false, features = ["alloc"] }
tokio = { version = "1", features = ["sync"] }
tokio-stream = "0.1"
43 changes: 16 additions & 27 deletions actix-multipart/src/server.rs
Expand Up @@ -17,7 +17,6 @@ use actix_web::{
};
use bytes::{Bytes, BytesMut};
use futures_core::stream::{LocalBoxStream, Stream};
use futures_util::stream::StreamExt as _;
use local_waker::LocalWaker;

use crate::error::MultipartError;
Expand Down Expand Up @@ -67,7 +66,7 @@ impl Multipart {
/// Create multipart instance for boundary.
pub fn new<S>(headers: &HeaderMap, stream: S) -> Multipart
where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
match Self::boundary(headers) {
Ok(boundary) => Multipart::from_boundary(boundary, stream),
Expand All @@ -77,36 +76,29 @@ impl Multipart {

/// Extract boundary info from headers.
pub(crate) fn boundary(headers: &HeaderMap) -> Result<String, MultipartError> {
if let Some(content_type) = headers.get(&header::CONTENT_TYPE) {
if let Ok(content_type) = content_type.to_str() {
if let Ok(ct) = content_type.parse::<mime::Mime>() {
if let Some(boundary) = ct.get_param(mime::BOUNDARY) {
Ok(boundary.as_str().to_owned())
} else {
Err(MultipartError::Boundary)
}
} else {
Err(MultipartError::ParseContentType)
}
} else {
Err(MultipartError::ParseContentType)
}
} else {
Err(MultipartError::NoContentType)
}
headers
.get(&header::CONTENT_TYPE)
.ok_or(MultipartError::NoContentType)?
.to_str()
.ok()
.and_then(|content_type| content_type.parse::<mime::Mime>().ok())
.ok_or(MultipartError::ParseContentType)?
.get_param(mime::BOUNDARY)
.map(|boundary| boundary.as_str().to_owned())
.ok_or(MultipartError::Boundary)
}

/// Create multipart instance for given boundary and stream
pub(crate) fn from_boundary<S>(boundary: String, stream: S) -> Multipart
where
S: Stream<Item = Result<Bytes, PayloadError>> + Unpin + 'static,
S: Stream<Item = Result<Bytes, PayloadError>> + 'static,
{
Multipart {
error: None,
safety: Safety::new(),
inner: Some(Rc::new(RefCell::new(InnerMultipart {
boundary,
payload: PayloadRef::new(PayloadBuffer::new(Box::new(stream))),
payload: PayloadRef::new(PayloadBuffer::new(stream)),
state: InnerState::FirstBoundary,
item: InnerMultipartItem::None,
}))),
Expand Down Expand Up @@ -690,10 +682,7 @@ impl PayloadRef {
}
}

fn get_mut<'a, 'b>(&'a self, s: &'b Safety) -> Option<RefMut<'a, PayloadBuffer>>
where
'a: 'b,
{
fn get_mut(&self, s: &Safety) -> Option<RefMut<'_, PayloadBuffer>> {
if s.current() {
Some(self.payload.borrow_mut())
} else {
Expand Down Expand Up @@ -779,7 +768,7 @@ impl PayloadBuffer {
PayloadBuffer {
eof: false,
buf: BytesMut::new(),
stream: stream.boxed_local(),
stream: Box::pin(stream),
}
}

Expand Down Expand Up @@ -860,7 +849,7 @@ mod tests {
use actix_web::test::TestRequest;
use actix_web::FromRequest;
use bytes::Bytes;
use futures_util::future::lazy;
use futures_util::{future::lazy, StreamExt};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

Expand Down

0 comments on commit 52bbbd1

Please sign in to comment.