diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 015f01aedf5..c295ddc4af4 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1033,9 +1033,6 @@ impl Sender { /// channel were dropped and only `WeakSender` instances remain, /// the channel is closed. pub fn downgrade(&self) -> WeakSender { - // Note: If this is the last `Sender` instance we want to close the - // channel when downgrading, so it's important to move into `self` here. - WeakSender { chan: self.chan.downgrade(), } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index df511916c64..a10ffb7d797 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -47,7 +47,7 @@ pub(crate) trait Semaphore { fn is_closed(&self) -> bool; } -pub(crate) struct Chan { +pub(super) struct Chan { /// Notifies all tasks listening for the receiver being dropped. notify_rx_closed: Notify, diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index c4b9baf01df..f112462811f 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -838,47 +838,36 @@ impl Drop for Msg { // `Sender` was dropped while more than one `WeakSender` remains, we want to // ensure that no messages are kept in the channel, which were sent after // the receiver was dropped. -#[tokio::test(start_paused = true)] +#[tokio::test] async fn test_msgs_dropped_on_rx_drop() { - fn ms(millis: u64) -> Duration { - Duration::from_millis(millis) - } - let (tx, mut rx) = mpsc::channel(3); - let rx_handle = tokio::spawn(async move { - let _ = rx.recv().await.unwrap(); - let _ = rx.recv().await.unwrap(); - time::sleep(ms(1)).await; - drop(rx); + let _ = tx.send(Msg {}).await.unwrap(); + let _ = tx.send(Msg {}).await.unwrap(); - time::advance(ms(1)).await; - }); + // This msg will be pending and should be dropped when `rx` is dropped + let sent_fut = tx.send(Msg {}); - let tx_handle = tokio::spawn(async move { - let _ = tx.send(Msg {}).await.unwrap(); - let _ = tx.send(Msg {}).await.unwrap(); + let _ = rx.recv().await.unwrap(); + let _ = rx.recv().await.unwrap(); - // This msg will be pending and should be dropped when `rx` is dropped - let _ = tx.send(Msg {}).await.unwrap(); - time::advance(ms(1)).await; + let _ = sent_fut.await.unwrap(); - // This msg will not be put onto `Tx` list anymore, since `Rx` is closed. - time::sleep(ms(1)).await; - assert!(tx.send(Msg {}).await.is_err()); + drop(rx); - // Ensure that third message isn't put onto the channel anymore - assert_eq!(NUM_DROPPED.load(Acquire), 4); - }); + assert_eq!(NUM_DROPPED.load(Acquire), 3); - let (_, _) = join!(rx_handle, tx_handle); + // This msg will not be put onto `Tx` list anymore, since `Rx` is closed. + assert!(tx.send(Msg {}).await.is_err()); + + assert_eq!(NUM_DROPPED.load(Acquire), 4); } // Tests that a `WeakSender` is upgradeable when other `Sender`s exist. #[tokio::test] async fn downgrade_upgrade_sender_success() { let (tx, _rx) = mpsc::channel::(1); - let weak_tx = tx.clone().downgrade(); + let weak_tx = tx.downgrade(); assert!(weak_tx.upgrade().is_some()); } @@ -897,6 +886,7 @@ async fn downgrade_upgrade_sender_failure() { async fn downgrade_drop_upgrade() { let (tx, _rx) = mpsc::channel::(1); + // the cloned `Tx` is dropped right away let weak_tx = tx.clone().downgrade(); drop(tx); assert!(weak_tx.upgrade().is_none()); @@ -907,7 +897,7 @@ async fn downgrade_drop_upgrade() { #[tokio::test] async fn downgrade_get_permit_upgrade_no_senders() { let (tx, _rx) = mpsc::channel::(1); - let weak_tx = tx.clone().downgrade(); + let weak_tx = tx.downgrade(); let _permit = tx.reserve_owned().await.unwrap(); assert!(weak_tx.upgrade().is_some()); } @@ -920,12 +910,13 @@ async fn downgrade_upgrade_get_permit_no_senders() { let tx2 = tx.clone(); let _permit = tx.reserve_owned().await.unwrap(); let weak_tx = tx2.downgrade(); + drop(tx2); assert!(weak_tx.upgrade().is_some()); } -// Tests that `Clone` of `WeakSender` doesn't decrement `tx_count`. +// Tests that `downgrade` does not change the `tx_count` of the channel. #[tokio::test] -async fn test_weak_sender_clone() { +async fn test_tx_count_weak_sender() { let (tx, _rx) = mpsc::channel::(1); let tx_weak = tx.downgrade(); let tx_weak2 = tx.downgrade();