Skip to content

Commit

Permalink
don't hang after dropping mutipart
Browse files Browse the repository at this point in the history
  • Loading branch information
aliemjay committed Nov 25, 2021
1 parent 66620a1 commit 851bfdb
Showing 1 changed file with 49 additions and 4 deletions.
53 changes: 49 additions & 4 deletions actix-multipart/src/server.rs
Expand Up @@ -710,8 +710,11 @@ impl Clone for PayloadRef {
}
}

/// Counter. It tracks of number of clones of payloads and give access to payload only to top most
/// task panics if Safety get destroyed and it not top most task.
/// Counter. It tracks of number of clones of payloads and give access to payload only to top most.
/// * When dropped, parent task is awakened. This is to support the case where Field is
/// dropped in a separate task than Multipart.
/// * Assumes that parent owners don't move to different tasks; only the top-most is allowed to.
/// * If dropped and is not top most owner, is_clean flag is set to false.
#[derive(Debug)]
struct Safety {
task: LocalWaker,
Expand Down Expand Up @@ -754,9 +757,9 @@ impl Safety {

impl Drop for Safety {
fn drop(&mut self) {
// parent task is dead
if Rc::strong_count(&self.payload) != self.level {
self.clean.set(true);
// Multipart dropped leaving a Field
self.clean.set(false);
}

self.task.wake();
Expand Down Expand Up @@ -859,10 +862,12 @@ mod tests {
use actix_web::http::header::{DispositionParam, DispositionType};
use actix_web::test::TestRequest;
use actix_web::FromRequest;
use actix_web::rt;
use bytes::Bytes;
use futures_util::future::lazy;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use std::time::Duration;

#[actix_rt::test]
async fn test_boundary() {
Expand Down Expand Up @@ -1290,4 +1295,44 @@ mod tests {
MultipartError::NoContentDisposition,
));
}

#[actix_rt::test]
async fn test_drop_multipart_dont_hang() {
let (sender, payload) = create_stream();
let (bytes, headers) = create_simple_request_with_header();
sender.send(Ok(bytes)).unwrap();
drop(sender); // eof

let mut multipart = Multipart::new(&headers, payload);
let mut field = multipart.next().await.unwrap().unwrap();

drop(multipart);

// should fail immediately
match field.next().await {
Some(Err(MultipartError::NotConsumed)) => {},
_ => panic!(),
};
}

#[actix_rt::test]
async fn test_drop_field_awaken_multipart() {
let (sender, payload) = create_stream();
let (bytes, headers) = create_simple_request_with_header();
sender.send(Ok(bytes)).unwrap();
drop(sender); // eof

let mut multipart = Multipart::new(&headers, payload);
let mut field = multipart.next().await.unwrap().unwrap();

let task = rt::spawn(async move {
rt::time::sleep(Duration::from_secs(1)).await;
assert_eq!(field.next().await.unwrap().unwrap(), "test");
drop(field);
});

// dropping field should awaken current task
let _ = multipart.next().await.unwrap().unwrap();
task.await.unwrap();
}
}

0 comments on commit 851bfdb

Please sign in to comment.