From 851bfdb46db6e8f10aa068be1bc70dd4338c9d21 Mon Sep 17 00:00:00 2001 From: Ali MJ Al-Nasrawy Date: Thu, 25 Nov 2021 21:28:27 +0300 Subject: [PATCH] don't hang after dropping mutipart --- actix-multipart/src/server.rs | 53 ++++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/actix-multipart/src/server.rs b/actix-multipart/src/server.rs index 43f9ccf5fcb..900599c118c 100644 --- a/actix-multipart/src/server.rs +++ b/actix-multipart/src/server.rs @@ -710,8 +710,11 @@ impl Clone for PayloadRef { } } -/// Counter. It tracks of number of clones of payloads and give access to payload only to top most -/// task panics if Safety get destroyed and it not top most task. +/// Counter. It tracks of number of clones of payloads and give access to payload only to top most. +/// * When dropped, parent task is awakened. This is to support the case where Field is +/// dropped in a separate task than Multipart. +/// * Assumes that parent owners don't move to different tasks; only the top-most is allowed to. +/// * If dropped and is not top most owner, is_clean flag is set to false. #[derive(Debug)] struct Safety { task: LocalWaker, @@ -754,9 +757,9 @@ impl Safety { impl Drop for Safety { fn drop(&mut self) { - // parent task is dead if Rc::strong_count(&self.payload) != self.level { - self.clean.set(true); + // Multipart dropped leaving a Field + self.clean.set(false); } self.task.wake(); @@ -859,10 +862,12 @@ mod tests { use actix_web::http::header::{DispositionParam, DispositionType}; use actix_web::test::TestRequest; use actix_web::FromRequest; + use actix_web::rt; use bytes::Bytes; use futures_util::future::lazy; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; + use std::time::Duration; #[actix_rt::test] async fn test_boundary() { @@ -1290,4 +1295,44 @@ mod tests { MultipartError::NoContentDisposition, )); } + + #[actix_rt::test] + async fn test_drop_multipart_dont_hang() { + let (sender, payload) = create_stream(); + let (bytes, headers) = create_simple_request_with_header(); + sender.send(Ok(bytes)).unwrap(); + drop(sender); // eof + + let mut multipart = Multipart::new(&headers, payload); + let mut field = multipart.next().await.unwrap().unwrap(); + + drop(multipart); + + // should fail immediately + match field.next().await { + Some(Err(MultipartError::NotConsumed)) => {}, + _ => panic!(), + }; + } + + #[actix_rt::test] + async fn test_drop_field_awaken_multipart() { + let (sender, payload) = create_stream(); + let (bytes, headers) = create_simple_request_with_header(); + sender.send(Ok(bytes)).unwrap(); + drop(sender); // eof + + let mut multipart = Multipart::new(&headers, payload); + let mut field = multipart.next().await.unwrap().unwrap(); + + let task = rt::spawn(async move { + rt::time::sleep(Duration::from_secs(1)).await; + assert_eq!(field.next().await.unwrap().unwrap(), "test"); + drop(field); + }); + + // dropping field should awaken current task + let _ = multipart.next().await.unwrap().unwrap(); + task.await.unwrap(); + } }