From f2a63ed99a7727d62b9339c7f8cb85ebdf31d897 Mon Sep 17 00:00:00 2001 From: Kevin Rauwolf Date: Thu, 2 Jun 2022 14:47:25 -0700 Subject: [PATCH] Read remaining bytes when cleaning dropped payload. Fixes #2764 --- actix-http/src/h1/dispatcher.rs | 19 +++++++- actix-http/src/h1/dispatcher_tests.rs | 68 +++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/actix-http/src/h1/dispatcher.rs b/actix-http/src/h1/dispatcher.rs index dea8a4bebdb..52d46cdb4bb 100644 --- a/actix-http/src/h1/dispatcher.rs +++ b/actix-http/src/h1/dispatcher.rs @@ -706,6 +706,9 @@ where debug!("handler dropped payload early; attempt to clean connection"); // ...in which case poll request payload a few times loop { + if this.read_buf.is_empty() { + Self::read_available_projected(&mut this, cx)?; + } match this.codec.decode(this.read_buf)? { Some(msg) => { match msg { @@ -1010,8 +1013,22 @@ where self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Result { - let this = self.project(); + let mut this = self.project(); + Self::read_available_projected(&mut this, cx) + } + /// Returns true when I/O stream can be disconnected after write to it. + /// Meant to be called when there is already access to a projected + /// `InnerDispatcher` available. + /// + /// It covers these conditions: + /// - `std::io::ErrorKind::ConnectionReset` after partial read; + /// - all data read done. + #[inline(always)] // TODO: bench this inline + fn read_available_projected( + this: &mut InnerDispatcherProj<'_, T, S, B, X, U>, + cx: &mut Context<'_>, + ) -> Result { if this.flags.contains(Flags::READ_DISCONNECT) { return Ok(false); }; diff --git a/actix-http/src/h1/dispatcher_tests.rs b/actix-http/src/h1/dispatcher_tests.rs index 40454d45a73..ddb70409307 100644 --- a/actix-http/src/h1/dispatcher_tests.rs +++ b/actix-http/src/h1/dispatcher_tests.rs @@ -783,6 +783,74 @@ async fn upgrade_handling() { .await; } +#[actix_rt::test] +async fn handler_drop_large_payload() { + let _ = env_logger::try_init(); + + const CONTENT_LENGTH: usize = 256 * 1024; + let content = str::from_utf8(&[b'x'; CONTENT_LENGTH]).unwrap(); + let buf = TestBuffer::new(http_msg(format!( + r" + POST /drop-payload HTTP/1.1 + Content-Length: {} + + {}", + CONTENT_LENGTH, content + ))); + + let services = HttpFlow::new( + drop_payload_service(), + ExpectHandler, + None::, + ); + + let h1 = Dispatcher::new( + buf.clone(), + services, + ServiceConfig::default(), + None, + OnConnectData::default(), + ); + pin!(h1); + + lazy(|cx| { + assert!(h1.as_mut().poll(cx).is_pending()); + assert!(h1.as_mut().poll(cx).is_pending()); + + // polls: manual + assert_eq!(h1.poll_count, 2); + + let mut res = BytesMut::from(buf.take_write_buf().as_ref()); + stabilize_date_header(&mut res); + let res = &res[..]; + + let exp = http_msg( + r" + HTTP/1.1 200 OK + content-length: 15 + date: Thu, 01 Jan 1970 12:34:56 UTC + + payload dropped + ", + ); + + assert_eq!( + res, + exp, + "\nexpected response not in write buffer:\n\ + response: {:?}\n\ + expected: {:?}", + String::from_utf8_lossy(res), + String::from_utf8_lossy(&exp) + ); + + if let DispatcherStateProj::Normal { inner } = h1.as_mut().project().inner.project() { + assert!(inner.state.is_none()); + } + }) + .await; +} + #[actix_rt::test] async fn handler_drop_payload() { let _ = env_logger::try_init();