Skip to content

Commit

Permalink
don't hang after dropping mutipart (#2463)
Browse files Browse the repository at this point in the history
  • Loading branch information
aliemjay committed Nov 29, 2021
1 parent cf54388 commit 654dc64
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 4 deletions.
2 changes: 2 additions & 0 deletions actix-multipart/CHANGES.md
Expand Up @@ -10,8 +10,10 @@
* Added `Field::name` method for getting the field name. [#2451]
* `MultipartError` now marks variants with inner errors as the source. [#2451]
* `MultipartError` is now marked as non-exhaustive. [#2451]
* Polling `Field` after dropping `Multipart` now fails immediately instead of hanging forever. [#2463]

[#2451]: https://github.com/actix/actix-web/pull/2451
[#2463]: https://github.com/actix/actix-web/pull/2463


## 0.4.0-beta.7 - 2021-10-20
Expand Down
53 changes: 49 additions & 4 deletions actix-multipart/src/server.rs
Expand Up @@ -706,8 +706,11 @@ impl Clone for PayloadRef {
}
}

/// Counter. It tracks of number of clones of payloads and give access to payload only to top most
/// task panics if Safety get destroyed and it not top most task.
/// Counter. It tracks of number of clones of payloads and give access to payload only to top most.
/// * When dropped, parent task is awakened. This is to support the case where Field is
/// dropped in a separate task than Multipart.
/// * Assumes that parent owners don't move to different tasks; only the top-most is allowed to.
/// * If dropped and is not top most owner, is_clean flag is set to false.
#[derive(Debug)]
struct Safety {
task: LocalWaker,
Expand Down Expand Up @@ -750,9 +753,9 @@ impl Safety {

impl Drop for Safety {
fn drop(&mut self) {
// parent task is dead
if Rc::strong_count(&self.payload) != self.level {
self.clean.set(true);
// Multipart dropped leaving a Field
self.clean.set(false);
}

self.task.wake();
Expand Down Expand Up @@ -853,10 +856,12 @@ mod tests {

use actix_http::h1::Payload;
use actix_web::http::header::{DispositionParam, DispositionType};
use actix_web::rt;
use actix_web::test::TestRequest;
use actix_web::FromRequest;
use bytes::Bytes;
use futures_util::{future::lazy, StreamExt};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

Expand Down Expand Up @@ -1286,4 +1291,44 @@ mod tests {
MultipartError::NoContentDisposition,
));
}

#[actix_rt::test]
async fn test_drop_multipart_dont_hang() {
let (sender, payload) = create_stream();
let (bytes, headers) = create_simple_request_with_header();
sender.send(Ok(bytes)).unwrap();
drop(sender); // eof

let mut multipart = Multipart::new(&headers, payload);
let mut field = multipart.next().await.unwrap().unwrap();

drop(multipart);

// should fail immediately
match field.next().await {
Some(Err(MultipartError::NotConsumed)) => {}
_ => panic!(),
};
}

#[actix_rt::test]
async fn test_drop_field_awaken_multipart() {
let (sender, payload) = create_stream();
let (bytes, headers) = create_simple_request_with_header();
sender.send(Ok(bytes)).unwrap();
drop(sender); // eof

let mut multipart = Multipart::new(&headers, payload);
let mut field = multipart.next().await.unwrap().unwrap();

let task = rt::spawn(async move {
rt::time::sleep(Duration::from_secs(1)).await;
assert_eq!(field.next().await.unwrap().unwrap(), "test");
drop(field);
});

// dropping field should awaken current task
let _ = multipart.next().await.unwrap().unwrap();
task.await.unwrap();
}
}

0 comments on commit 654dc64

Please sign in to comment.