Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't hang after dropping mutipart #2463

Merged
merged 4 commits into from Nov 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions actix-multipart/CHANGES.md
Expand Up @@ -10,8 +10,10 @@
* Added `Field::name` method for getting the field name. [#2451]
* `MultipartError` now marks variants with inner errors as the source. [#2451]
* `MultipartError` is now marked as non-exhaustive. [#2451]
* Polling `Field` after dropping `Multipart` now fails immediately instead of hanging forever. [#2463]

[#2451]: https://github.com/actix/actix-web/pull/2451
[#2463]: https://github.com/actix/actix-web/pull/2463


## 0.4.0-beta.7 - 2021-10-20
Expand Down
53 changes: 49 additions & 4 deletions actix-multipart/src/server.rs
Expand Up @@ -706,8 +706,11 @@ impl Clone for PayloadRef {
}
}

/// Counter. It tracks of number of clones of payloads and give access to payload only to top most
/// task panics if Safety get destroyed and it not top most task.
/// Counter. It tracks of number of clones of payloads and give access to payload only to top most.
/// * When dropped, parent task is awakened. This is to support the case where Field is
/// dropped in a separate task than Multipart.
/// * Assumes that parent owners don't move to different tasks; only the top-most is allowed to.
/// * If dropped and is not top most owner, is_clean flag is set to false.
#[derive(Debug)]
struct Safety {
task: LocalWaker,
Expand Down Expand Up @@ -750,9 +753,9 @@ impl Safety {

impl Drop for Safety {
fn drop(&mut self) {
// parent task is dead
if Rc::strong_count(&self.payload) != self.level {
self.clean.set(true);
// Multipart dropped leaving a Field
self.clean.set(false);
}

self.task.wake();
Expand Down Expand Up @@ -853,10 +856,12 @@ mod tests {

use actix_http::h1::Payload;
use actix_web::http::header::{DispositionParam, DispositionType};
use actix_web::rt;
use actix_web::test::TestRequest;
use actix_web::FromRequest;
use bytes::Bytes;
use futures_util::{future::lazy, StreamExt};
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;

Expand Down Expand Up @@ -1286,4 +1291,44 @@ mod tests {
MultipartError::NoContentDisposition,
));
}

#[actix_rt::test]
async fn test_drop_multipart_dont_hang() {
let (sender, payload) = create_stream();
let (bytes, headers) = create_simple_request_with_header();
sender.send(Ok(bytes)).unwrap();
drop(sender); // eof

let mut multipart = Multipart::new(&headers, payload);
let mut field = multipart.next().await.unwrap().unwrap();

drop(multipart);

// should fail immediately
match field.next().await {
Some(Err(MultipartError::NotConsumed)) => {}
_ => panic!(),
};
}

#[actix_rt::test]
async fn test_drop_field_awaken_multipart() {
let (sender, payload) = create_stream();
let (bytes, headers) = create_simple_request_with_header();
sender.send(Ok(bytes)).unwrap();
drop(sender); // eof

let mut multipart = Multipart::new(&headers, payload);
let mut field = multipart.next().await.unwrap().unwrap();

let task = rt::spawn(async move {
rt::time::sleep(Duration::from_secs(1)).await;
assert_eq!(field.next().await.unwrap().unwrap(), "test");
drop(field);
});

// dropping field should awaken current task
let _ = multipart.next().await.unwrap().unwrap();
task.await.unwrap();
}
}