From 83dccb788ac866c27c4f858fcc1c38b4fda4b2bb Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Thu, 25 Nov 2021 21:28:27 +0300 Subject: [PATCH 1/3] don't hang after dropping mutipart --- actix-multipart/src/server.rs | 53 ++++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index c9642cfad68..0772e3d7897 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -706,8 +706,11 @@ impl Clone for PayloadRef { } } -/// Counter. It tracks of number of clones of payloads and give access to payload only to top most -/// task panics if Safety get destroyed and it not top most task. +/// Counter. It tracks of number of clones of payloads and give access to payload only to top most. +/// * When dropped, parent task is awakened. This is to support the case where Field is +/// dropped in a separate task than Multipart. +/// * Assumes that parent owners don't move to different tasks; only the top-most is allowed to. +/// * If dropped and is not top most owner, is_clean flag is set to false. #[derive(Debug)] struct Safety { task: LocalWaker, @@ -750,9 +753,9 @@ impl Safety { impl Drop for Safety { fn drop(&mut self) { - // parent task is dead if Rc::strong_count(&self.payload) != self.level { - self.clean.set(true); + // Multipart dropped leaving a Field + self.clean.set(false); } self.task.wake(); @@ -855,10 +858,12 @@ mod tests { use actix_web::http::header::{DispositionParam, DispositionType}; use actix_web::test::TestRequest; use actix_web::FromRequest; + use actix_web::rt; use bytes::Bytes; use futures_util::{future::lazy, StreamExt}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; + use std::time::Duration; #[actix_rt::test] async fn test_boundary() { @@ -1286,4 +1291,44 @@ mod tests { MultipartError::NoContentDisposition, )); } + + #[actix_rt::test] + async fn test_drop_multipart_dont_hang() { + let (sender, payload) = create_stream(); + let (bytes, headers) = create_simple_request_with_header(); + sender.send(Ok(bytes)).unwrap(); + drop(sender); // eof + + let mut multipart = Multipart::new(&headers, payload); + let mut field = multipart.next().await.unwrap().unwrap(); + + drop(multipart); + + // should fail immediately + match field.next().await { + Some(Err(MultipartError::NotConsumed)) => {}, + _ => panic!(), + }; + } + + #[actix_rt::test] + async fn test_drop_field_awaken_multipart() { + let (sender, payload) = create_stream(); + let (bytes, headers) = create_simple_request_with_header(); + sender.send(Ok(bytes)).unwrap(); + drop(sender); // eof + + let mut multipart = Multipart::new(&headers, payload); + let mut field = multipart.next().await.unwrap().unwrap(); + + let task = rt::spawn(async move { + rt::time::sleep(Duration::from_secs(1)).await; + assert_eq!(field.next().await.unwrap().unwrap(), "test"); + drop(field); + }); + + // dropping field should awaken current task + let _ = multipart.next().await.unwrap().unwrap(); + task.await.unwrap(); + } } From 595d1a4a46c5297b3227efb8924e39f394d70312 Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Thu, 25 Nov 2021 21:44:19 +0300 Subject: [PATCH 2/3] changelog --- actix-multipart/CHANGES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/actix-multipart/CHANGES.md b/actix-multipart/CHANGES.md index 1ef4aab3f57..deb99987863 100644 --- a/actix-multipart/CHANGES.md +++ b/actix-multipart/CHANGES.md @@ -10,8 +10,10 @@ * Added `Field::name` method for getting the field name. [#2451] * `MultipartError` now marks variants with inner errors as the source. [#2451] * `MultipartError` is now marked as non-exhaustive. [#2451] +* Polling `Field` after dropping `Multipart` now fails immediately instead of hanging forever. [#2463] [#2451]: https://github.com/actix/actix-web/pull/2451 +[#2463]: https://github.com/actix/actix-web/pull/2463 ## 0.4.0-beta.7 - 2021-10-20 From da342776163e2f04b13e1bbe45e83b3a5d452d46 Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Thu, 25 Nov 2021 21:45:51 +0300 Subject: [PATCH 3/3] fmt --- actix-multipart/src/server.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 0772e3d7897..319e7986366 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -856,14 +856,14 @@ mod tests { use actix_http::h1::Payload; use actix_web::http::header::{DispositionParam, DispositionType}; + use actix_web::rt; use actix_web::test::TestRequest; use actix_web::FromRequest; - use actix_web::rt; use bytes::Bytes; use futures_util::{future::lazy, StreamExt}; + use std::time::Duration; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; - use std::time::Duration; #[actix_rt::test] async fn test_boundary() { @@ -1306,7 +1306,7 @@ mod tests { // should fail immediately match field.next().await { - Some(Err(MultipartError::NotConsumed)) => {}, + Some(Err(MultipartError::NotConsumed)) => {} _ => panic!(), }; }