From 89c6d626569af3ca1deb22be64f111d121389476 Mon Sep 17 00:00:00 2001 From: fakeshadow <24548779@qq.com> Date: Thu, 25 Nov 2021 08:10:53 +0800 Subject: [PATCH] clean up multipart and field stream trait impl (#2462) --- actix-multipart/src/server.rs | 75 +++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 34 deletions(-) diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 55ac00ff735..c9642cfad68 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -32,7 +32,7 @@ const MAX_HEADERS: usize = 32; pub struct Multipart { safety: Safety, error: Option, - inner: Option>>, + inner: Option, } enum InnerMultipartItem { @@ -96,12 +96,12 @@ impl Multipart { Multipart { error: None, safety: Safety::new(), - inner: Some(Rc::new(RefCell::new(InnerMultipart { + inner: Some(InnerMultipart { boundary, payload: PayloadRef::new(PayloadBuffer::new(stream)), state: InnerState::FirstBoundary, item: InnerMultipartItem::None, - }))), + }), } } @@ -118,20 +118,27 @@ impl Multipart { impl Stream for Multipart { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(err) = self.error.take() { - Poll::Ready(Some(Err(err))) - } else if self.safety.current() { - let this = self.get_mut(); - let mut inner = this.inner.as_mut().unwrap().borrow_mut(); - if let Some(mut payload) = inner.payload.get_mut(&this.safety) { - payload.poll_stream(cx)?; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + match this.inner.as_mut() { + Some(inner) => { + if let Some(mut buffer) = inner.payload.get_mut(&this.safety) { + // check safety and poll read payload to buffer. + buffer.poll_stream(cx)?; + } else if !this.safety.is_clean() { + // safety violation + return Poll::Ready(Some(Err(MultipartError::NotConsumed))); + } else { + return Poll::Pending; + } + + inner.poll(&this.safety, cx) } - inner.poll(&this.safety, cx) - } else if !self.safety.is_clean() { - Poll::Ready(Some(Err(MultipartError::NotConsumed))) - } else { - Poll::Pending + None => Poll::Ready(Some(Err(this + .error + .take() + .expect("Multipart polled after finish")))), } } } @@ -152,17 +159,15 @@ impl InnerMultipart { Ok(httparse::Status::Complete((_, hdrs))) => { // convert headers let mut headers = HeaderMap::with_capacity(hdrs.len()); + for h in hdrs { - if let Ok(name) = HeaderName::try_from(h.name) { - if let Ok(value) = HeaderValue::try_from(h.value) { - headers.append(name, value); - } else { - return Err(ParseError::Header.into()); - } - } else { - return Err(ParseError::Header.into()); - } + let name = + HeaderName::try_from(h.name).map_err(|_| ParseError::Header)?; + let value = HeaderValue::try_from(h.value) + .map_err(|_| ParseError::Header)?; + headers.append(name, value); } + Ok(Some(headers)) } Ok(httparse::Status::Partial) => Err(ParseError::Header.into()), @@ -458,17 +463,19 @@ impl Stream for Field { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if self.safety.current() { - let mut inner = self.inner.borrow_mut(); - if let Some(mut payload) = inner.payload.as_ref().unwrap().get_mut(&self.safety) { - payload.poll_stream(cx)?; - } - inner.poll(&self.safety) - } else if !self.safety.is_clean() { - Poll::Ready(Some(Err(MultipartError::NotConsumed))) + let this = self.get_mut(); + let mut inner = this.inner.borrow_mut(); + if let Some(mut buffer) = inner.payload.as_ref().unwrap().get_mut(&this.safety) { + // check safety and poll read payload to buffer. + buffer.poll_stream(cx)?; + } else if !this.safety.is_clean() { + // safety violation + return Poll::Ready(Some(Err(MultipartError::NotConsumed))); } else { - Poll::Pending + return Poll::Pending; } + + inner.poll(&this.safety) } }