From 355e4c1d7a00d8675d35ce26568967f9a29d6fd5 Mon Sep 17 00:00:00 2001 From: b-naber Date: Wed, 6 Jul 2022 18:38:46 +0200 Subject: [PATCH] add test to ensure that no pending msgs are kept in the channel after rx was dropped --- tokio-util/tests/mpsc.rs | 63 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/tokio-util/tests/mpsc.rs b/tokio-util/tests/mpsc.rs index 872964a41f7..7f7ccd2ece4 100644 --- a/tokio-util/tests/mpsc.rs +++ b/tokio-util/tests/mpsc.rs @@ -1,6 +1,14 @@ use futures::future::poll_fn; +use std::ops::Drop; +use std::sync::atomic::{ + AtomicUsize, + Ordering::{Acquire, Release}, +}; +use std::time::Duration; +use tokio::join; use tokio::sync::mpsc::{self, channel}; use tokio::sync::oneshot; +use tokio::time; use tokio_test::task::spawn; use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; use tokio_util::sync::PollSender; @@ -393,3 +401,58 @@ async fn actor_weak_sender() { let _ = actor_handle.await; } + +static NUM_DROPPED: AtomicUsize = AtomicUsize::new(0); + +#[derive(Debug)] +struct Msg; + +impl Drop for Msg { + fn drop(&mut self) { + NUM_DROPPED.fetch_add(1, Release); + } +} + +// Tests that no pending messages are put onto the channel after `Rx` was +// dropped. +// +// Note: After the introduction of `WeakSender`, which internally +// used `Arc` and doesn't call a drop of the channel after the last strong +// `Sender` was dropped while more than one `WeakSender` remains, we want to +// ensure that no messages are kept in the channel, which were sent after +// the receiver was dropped. +#[tokio::test(start_paused = true)] +async fn test_msgs_dropped_on_rx_drop() { + fn ms(millis: u64) -> Duration { + Duration::from_millis(millis) + } + + let (tx, mut rx) = mpsc::channel(3); + + let rx_handle = tokio::spawn(async move { + let _ = rx.recv().await.unwrap(); + let _ = rx.recv().await.unwrap(); + time::sleep(ms(1)).await; + drop(rx); + + time::advance(ms(1)).await; + }); + + let tx_handle = tokio::spawn(async move { + let _ = tx.send(Msg {}).await.unwrap(); + let _ = tx.send(Msg {}).await.unwrap(); + + // This msg will be pending and should be dropped when `rx` is dropped + let _ = tx.send(Msg {}).await.unwrap(); + time::advance(ms(1)).await; + + // This msg will not be put onto `Tx` list anymore, since `Rx` is closed. + time::sleep(ms(1)).await; + let _ = tx.send(Msg {}).await.unwrap(); + + // Ensure that third message isn't put onto the channel anymore + assert_eq!(NUM_DROPPED.load(Acquire), 4); + }); + + let (_, _) = join!(rx_handle, tx_handle); +}