Skip to content

Commit

Permalink
add test to ensure that no pending msgs are kept in the channel after…
Browse files Browse the repository at this point in the history
… rx was dropped
  • Loading branch information
b-naber committed Jul 6, 2022
1 parent fe51bd7 commit 853399c
Showing 1 changed file with 63 additions and 0 deletions.
63 changes: 63 additions & 0 deletions tokio-util/tests/mpsc.rs
@@ -1,6 +1,14 @@
use futures::future::poll_fn;
use std::ops::Drop;
use std::sync::atomic::{
AtomicUsize,
Ordering::{Acquire, Release},
};
use std::time::Duration;
use tokio::join;
use tokio::sync::mpsc::{self, channel};
use tokio::sync::oneshot;
use tokio::time;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
use tokio_util::sync::PollSender;
Expand Down Expand Up @@ -393,3 +401,58 @@ async fn actor_weak_sender() {

let _ = actor_handle.await;
}

static NUM_DROPPED: AtomicUsize = AtomicUsize::new(0);

#[derive(Debug)]
struct Msg;

impl Drop for Msg {
fn drop(&mut self) {
NUM_DROPPED.fetch_add(1, Release);
}
}

// Tests that no pending messages are put onto the channel after `Rx` was
// dropped.
//
// Note: After the introduction of `WeakSender`, which internally
// used `Arc` and doesn't call a drop of the channel after the last strong
// `Sender` was dropped while more than one `WeakSender` remains, we want to
// ensure that no messages are kept in the channel, which were sent after
// the receiver was dropped.
#[tokio::test(start_paused = true)]
async fn test_msgs_dropped_on_rx_drop() {
fn ms(millis: u64) -> Duration {
Duration::from_millis(millis)
}

let (tx, mut rx) = mpsc::channel(3);

let rx_handle = tokio::spawn(async move {
let _ = rx.recv().await.unwrap();
let _ = rx.recv().await.unwrap();
time::sleep(ms(1)).await;
drop(rx);

time::advance(ms(1)).await;
});

let tx_handle = tokio::spawn(async move {
let _ = tx.send(Msg {}).await.unwrap();
let _ = tx.send(Msg {}).await.unwrap();

// This msg will be pending and should be dropped when `rx` is dropped
let _ = tx.send(Msg {}).await.unwrap();
time::advance(ms(1)).await;

// This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
time::sleep(ms(1)).await;
let _ = tx.send(Msg {}).await.unwrap();

// Ensure that third message isn't put onto the channel anymore
assert_eq!(NUM_DROPPED.load(Acquire), 4);
});

let _ = join!(rx_handle, tx_handle).unwrap();
}

0 comments on commit 853399c

Please sign in to comment.