From 582d51290772491aa01b2efbbc86d4bdc5fabe21 Mon Sep 17 00:00:00 2001 From: Artyom Kozhemiakin Date: Sun, 13 Nov 2022 01:29:13 +0300 Subject: [PATCH] sync: add `mpsc::WeakUnboundedSender` (#5189) Signed-off-by: Artyom Kozhemiakin --- tokio/src/sync/mpsc/mod.rs | 4 +- tokio/src/sync/mpsc/unbounded.rs | 69 ++++- tokio/tests/sync_mpsc.rs | 274 +---------------- tokio/tests/sync_mpsc_weak.rs | 513 +++++++++++++++++++++++++++++++ 4 files changed, 588 insertions(+), 272 deletions(-) create mode 100644 tokio/tests/sync_mpsc_weak.rs diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs index d37779ca761..fff309127ad 100644 --- a/tokio/src/sync/mpsc/mod.rs +++ b/tokio/src/sync/mpsc/mod.rs @@ -97,7 +97,9 @@ mod chan; pub(super) mod list; mod unbounded; -pub use self::unbounded::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +pub use self::unbounded::{ + unbounded_channel, UnboundedReceiver, UnboundedSender, WeakUnboundedSender, +}; pub mod error; diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index ce6a7353e07..501020433cf 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -1,4 +1,4 @@ -use crate::loom::sync::atomic::AtomicUsize; +use crate::loom::sync::{atomic::AtomicUsize, Arc}; use crate::sync::mpsc::chan; use crate::sync::mpsc::error::{SendError, TryRecvError}; @@ -13,6 +13,40 @@ pub struct UnboundedSender { chan: chan::Tx, } +/// An unbounded sender that does not prevent the channel from being closed. +/// +/// If all [`UnboundedSender`] instances of a channel were dropped and only +/// `WeakUnboundedSender` instances remain, the channel is closed. +/// +/// In order to send messages, the `WeakUnboundedSender` needs to be upgraded using +/// [`WeakUnboundedSender::upgrade`], which returns `Option`. It returns `None` +/// if all `UnboundedSender`s have been dropped, and otherwise it returns an `UnboundedSender`. +/// +/// [`UnboundedSender`]: UnboundedSender +/// [`WeakUnboundedSender::upgrade`]: WeakUnboundedSender::upgrade +/// +/// #Examples +/// +/// ``` +/// use tokio::sync::mpsc::unbounded_channel; +/// +/// #[tokio::main] +/// async fn main() { +/// let (tx, _rx) = unbounded_channel::(); +/// let tx_weak = tx.downgrade(); +/// +/// // Upgrading will succeed because `tx` still exists. +/// assert!(tx_weak.upgrade().is_some()); +/// +/// // If we drop `tx`, then it will fail. +/// drop(tx); +/// assert!(tx_weak.clone().upgrade().is_none()); +/// } +/// ``` +pub struct WeakUnboundedSender { + chan: Arc>, +} + impl Clone for UnboundedSender { fn clone(&self) -> Self { UnboundedSender { @@ -384,4 +418,37 @@ impl UnboundedSender { pub fn same_channel(&self, other: &Self) -> bool { self.chan.same_channel(&other.chan) } + + /// Converts the `UnboundedSender` to a [`WeakUnboundedSender`] that does not count + /// towards RAII semantics, i.e. if all `UnboundedSender` instances of the + /// channel were dropped and only `WeakUnboundedSender` instances remain, + /// the channel is closed. + pub fn downgrade(&self) -> WeakUnboundedSender { + WeakUnboundedSender { + chan: self.chan.downgrade(), + } + } +} + +impl Clone for WeakUnboundedSender { + fn clone(&self) -> Self { + WeakUnboundedSender { + chan: self.chan.clone(), + } + } +} + +impl WeakUnboundedSender { + /// Tries to convert a WeakUnboundedSender into an [`UnboundedSender`]. + /// This will return `Some` if there are other `Sender` instances alive and + /// the channel wasn't previously dropped, otherwise `None` is returned. + pub fn upgrade(&self) -> Option> { + chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new) + } +} + +impl fmt::Debug for WeakUnboundedSender { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("WeakUnboundedSender").finish() + } } diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index cb0c37f9fb1..6e870964113 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -7,19 +7,14 @@ use wasm_bindgen_test::wasm_bindgen_test as test; #[cfg(tokio_wasm_not_wasi)] use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; +use std::fmt; +use std::sync::Arc; +use tokio::sync::mpsc; +use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; #[cfg(not(tokio_wasm_not_wasi))] use tokio::test as maybe_tokio_test; - -use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; -use tokio::sync::mpsc::{self, channel}; -use tokio::sync::oneshot; use tokio_test::*; -use std::fmt; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::{Acquire, Release}; -use std::sync::Arc; - #[cfg(not(tokio_wasm))] mod support { pub(crate) mod mpsc_stream; @@ -662,267 +657,6 @@ fn recv_timeout_panic() { tx.send_timeout(10, Duration::from_secs(1)).now_or_never(); } -#[tokio::test] -async fn weak_sender() { - let (tx, mut rx) = channel(11); - - let tx_weak = tokio::spawn(async move { - let tx_weak = tx.clone().downgrade(); - - for i in 0..10 { - if tx.send(i).await.is_err() { - return None; - } - } - - let tx2 = tx_weak - .upgrade() - .expect("expected to be able to upgrade tx_weak"); - let _ = tx2.send(20).await; - let tx_weak = tx2.downgrade(); - - Some(tx_weak) - }) - .await - .unwrap(); - - for i in 0..12 { - let recvd = rx.recv().await; - - match recvd { - Some(msg) => { - if i == 10 { - assert_eq!(msg, 20); - } - } - None => { - assert_eq!(i, 11); - break; - } - } - } - - let tx_weak = tx_weak.unwrap(); - let upgraded = tx_weak.upgrade(); - assert!(upgraded.is_none()); -} - -#[tokio::test] -async fn actor_weak_sender() { - pub struct MyActor { - receiver: mpsc::Receiver, - sender: mpsc::WeakSender, - next_id: u32, - pub received_self_msg: bool, - } - - enum ActorMessage { - GetUniqueId { respond_to: oneshot::Sender }, - SelfMessage {}, - } - - impl MyActor { - fn new( - receiver: mpsc::Receiver, - sender: mpsc::WeakSender, - ) -> Self { - MyActor { - receiver, - sender, - next_id: 0, - received_self_msg: false, - } - } - - fn handle_message(&mut self, msg: ActorMessage) { - match msg { - ActorMessage::GetUniqueId { respond_to } => { - self.next_id += 1; - - // The `let _ =` ignores any errors when sending. - // - // This can happen if the `select!` macro is used - // to cancel waiting for the response. - let _ = respond_to.send(self.next_id); - } - ActorMessage::SelfMessage { .. } => { - self.received_self_msg = true; - } - } - } - - async fn send_message_to_self(&mut self) { - let msg = ActorMessage::SelfMessage {}; - - let sender = self.sender.clone(); - - // cannot move self.sender here - if let Some(sender) = sender.upgrade() { - let _ = sender.send(msg).await; - self.sender = sender.downgrade(); - } - } - - async fn run(&mut self) { - let mut i = 0; - while let Some(msg) = self.receiver.recv().await { - self.handle_message(msg); - - if i == 0 { - self.send_message_to_self().await; - } - - i += 1 - } - - assert!(self.received_self_msg); - } - } - - #[derive(Clone)] - pub struct MyActorHandle { - sender: mpsc::Sender, - } - - impl MyActorHandle { - pub fn new() -> (Self, MyActor) { - let (sender, receiver) = mpsc::channel(8); - let actor = MyActor::new(receiver, sender.clone().downgrade()); - - (Self { sender }, actor) - } - - pub async fn get_unique_id(&self) -> u32 { - let (send, recv) = oneshot::channel(); - let msg = ActorMessage::GetUniqueId { respond_to: send }; - - // Ignore send errors. If this send fails, so does the - // recv.await below. There's no reason to check the - // failure twice. - let _ = self.sender.send(msg).await; - recv.await.expect("Actor task has been killed") - } - } - - let (handle, mut actor) = MyActorHandle::new(); - - let actor_handle = tokio::spawn(async move { actor.run().await }); - - let _ = tokio::spawn(async move { - let _ = handle.get_unique_id().await; - drop(handle); - }) - .await; - - let _ = actor_handle.await; -} - -static NUM_DROPPED: AtomicUsize = AtomicUsize::new(0); - -#[derive(Debug)] -struct Msg; - -impl Drop for Msg { - fn drop(&mut self) { - NUM_DROPPED.fetch_add(1, Release); - } -} - -// Tests that no pending messages are put onto the channel after `Rx` was -// dropped. -// -// Note: After the introduction of `WeakSender`, which internally -// used `Arc` and doesn't call a drop of the channel after the last strong -// `Sender` was dropped while more than one `WeakSender` remains, we want to -// ensure that no messages are kept in the channel, which were sent after -// the receiver was dropped. -#[tokio::test] -async fn test_msgs_dropped_on_rx_drop() { - let (tx, mut rx) = mpsc::channel(3); - - tx.send(Msg {}).await.unwrap(); - tx.send(Msg {}).await.unwrap(); - - // This msg will be pending and should be dropped when `rx` is dropped - let sent_fut = tx.send(Msg {}); - - let _ = rx.recv().await.unwrap(); - let _ = rx.recv().await.unwrap(); - - sent_fut.await.unwrap(); - - drop(rx); - - assert_eq!(NUM_DROPPED.load(Acquire), 3); - - // This msg will not be put onto `Tx` list anymore, since `Rx` is closed. - assert!(tx.send(Msg {}).await.is_err()); - - assert_eq!(NUM_DROPPED.load(Acquire), 4); -} - -// Tests that a `WeakSender` is upgradeable when other `Sender`s exist. -#[tokio::test] -async fn downgrade_upgrade_sender_success() { - let (tx, _rx) = mpsc::channel::(1); - let weak_tx = tx.downgrade(); - assert!(weak_tx.upgrade().is_some()); -} - -// Tests that a `WeakSender` fails to upgrade when no other `Sender` exists. -#[tokio::test] -async fn downgrade_upgrade_sender_failure() { - let (tx, _rx) = mpsc::channel::(1); - let weak_tx = tx.downgrade(); - drop(tx); - assert!(weak_tx.upgrade().is_none()); -} - -// Tests that a `WeakSender` cannot be upgraded after a `Sender` was dropped, -// which existed at the time of the `downgrade` call. -#[tokio::test] -async fn downgrade_drop_upgrade() { - let (tx, _rx) = mpsc::channel::(1); - - // the cloned `Tx` is dropped right away - let weak_tx = tx.clone().downgrade(); - drop(tx); - assert!(weak_tx.upgrade().is_none()); -} - -// Tests that we can upgrade a weak sender with an outstanding permit -// but no other strong senders. -#[tokio::test] -async fn downgrade_get_permit_upgrade_no_senders() { - let (tx, _rx) = mpsc::channel::(1); - let weak_tx = tx.downgrade(); - let _permit = tx.reserve_owned().await.unwrap(); - assert!(weak_tx.upgrade().is_some()); -} - -// Tests that you can downgrade and upgrade a sender with an outstanding permit -// but no other senders left. -#[tokio::test] -async fn downgrade_upgrade_get_permit_no_senders() { - let (tx, _rx) = mpsc::channel::(1); - let tx2 = tx.clone(); - let _permit = tx.reserve_owned().await.unwrap(); - let weak_tx = tx2.downgrade(); - drop(tx2); - assert!(weak_tx.upgrade().is_some()); -} - -// Tests that `downgrade` does not change the `tx_count` of the channel. -#[tokio::test] -async fn test_tx_count_weak_sender() { - let (tx, _rx) = mpsc::channel::(1); - let tx_weak = tx.downgrade(); - let tx_weak2 = tx.downgrade(); - drop(tx); - - assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none()); -} - // Tests that channel `capacity` changes and `max_capacity` stays the same #[tokio::test] async fn test_tx_capacity() { diff --git a/tokio/tests/sync_mpsc_weak.rs b/tokio/tests/sync_mpsc_weak.rs new file mode 100644 index 00000000000..ad8f7ce7d3e --- /dev/null +++ b/tokio/tests/sync_mpsc_weak.rs @@ -0,0 +1,513 @@ +#![allow(clippy::redundant_clone)] +#![warn(rust_2018_idioms)] +#![cfg(feature = "sync")] + +#[cfg(tokio_wasm_not_wasi)] +use wasm_bindgen_test::wasm_bindgen_test as test; + +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::{Acquire, Release}; +use tokio::sync::mpsc::{self, channel, unbounded_channel}; +use tokio::sync::oneshot; + +#[tokio::test] +async fn weak_sender() { + let (tx, mut rx) = channel(11); + + let tx_weak = tokio::spawn(async move { + let tx_weak = tx.clone().downgrade(); + + for i in 0..10 { + if tx.send(i).await.is_err() { + return None; + } + } + + let tx2 = tx_weak + .upgrade() + .expect("expected to be able to upgrade tx_weak"); + let _ = tx2.send(20).await; + let tx_weak = tx2.downgrade(); + + Some(tx_weak) + }) + .await + .unwrap(); + + for i in 0..12 { + let recvd = rx.recv().await; + + match recvd { + Some(msg) => { + if i == 10 { + assert_eq!(msg, 20); + } + } + None => { + assert_eq!(i, 11); + break; + } + } + } + + let tx_weak = tx_weak.unwrap(); + let upgraded = tx_weak.upgrade(); + assert!(upgraded.is_none()); +} + +#[tokio::test] +async fn actor_weak_sender() { + pub struct MyActor { + receiver: mpsc::Receiver, + sender: mpsc::WeakSender, + next_id: u32, + pub received_self_msg: bool, + } + + enum ActorMessage { + GetUniqueId { respond_to: oneshot::Sender }, + SelfMessage {}, + } + + impl MyActor { + fn new( + receiver: mpsc::Receiver, + sender: mpsc::WeakSender, + ) -> Self { + MyActor { + receiver, + sender, + next_id: 0, + received_self_msg: false, + } + } + + fn handle_message(&mut self, msg: ActorMessage) { + match msg { + ActorMessage::GetUniqueId { respond_to } => { + self.next_id += 1; + + // The `let _ =` ignores any errors when sending. + // + // This can happen if the `select!` macro is used + // to cancel waiting for the response. + let _ = respond_to.send(self.next_id); + } + ActorMessage::SelfMessage { .. } => { + self.received_self_msg = true; + } + } + } + + async fn send_message_to_self(&mut self) { + let msg = ActorMessage::SelfMessage {}; + + let sender = self.sender.clone(); + + // cannot move self.sender here + if let Some(sender) = sender.upgrade() { + let _ = sender.send(msg).await; + self.sender = sender.downgrade(); + } + } + + async fn run(&mut self) { + let mut i = 0; + while let Some(msg) = self.receiver.recv().await { + self.handle_message(msg); + + if i == 0 { + self.send_message_to_self().await; + } + + i += 1 + } + + assert!(self.received_self_msg); + } + } + + #[derive(Clone)] + pub struct MyActorHandle { + sender: mpsc::Sender, + } + + impl MyActorHandle { + pub fn new() -> (Self, MyActor) { + let (sender, receiver) = mpsc::channel(8); + let actor = MyActor::new(receiver, sender.clone().downgrade()); + + (Self { sender }, actor) + } + + pub async fn get_unique_id(&self) -> u32 { + let (send, recv) = oneshot::channel(); + let msg = ActorMessage::GetUniqueId { respond_to: send }; + + // Ignore send errors. If this send fails, so does the + // recv.await below. There's no reason to check the + // failure twice. + let _ = self.sender.send(msg).await; + recv.await.expect("Actor task has been killed") + } + } + + let (handle, mut actor) = MyActorHandle::new(); + + let actor_handle = tokio::spawn(async move { actor.run().await }); + + let _ = tokio::spawn(async move { + let _ = handle.get_unique_id().await; + drop(handle); + }) + .await; + + let _ = actor_handle.await; +} + +static NUM_DROPPED: AtomicUsize = AtomicUsize::new(0); + +#[derive(Debug)] +struct Msg; + +impl Drop for Msg { + fn drop(&mut self) { + NUM_DROPPED.fetch_add(1, Release); + } +} + +// Tests that no pending messages are put onto the channel after `Rx` was +// dropped. +// +// Note: After the introduction of `WeakSender`, which internally +// used `Arc` and doesn't call a drop of the channel after the last strong +// `Sender` was dropped while more than one `WeakSender` remains, we want to +// ensure that no messages are kept in the channel, which were sent after +// the receiver was dropped. +#[tokio::test] +async fn test_msgs_dropped_on_rx_drop() { + let (tx, mut rx) = mpsc::channel(3); + + tx.send(Msg {}).await.unwrap(); + tx.send(Msg {}).await.unwrap(); + + // This msg will be pending and should be dropped when `rx` is dropped + let sent_fut = tx.send(Msg {}); + + let _ = rx.recv().await.unwrap(); + let _ = rx.recv().await.unwrap(); + + sent_fut.await.unwrap(); + + drop(rx); + + assert_eq!(NUM_DROPPED.load(Acquire), 3); + + // This msg will not be put onto `Tx` list anymore, since `Rx` is closed. + assert!(tx.send(Msg {}).await.is_err()); + + assert_eq!(NUM_DROPPED.load(Acquire), 4); +} + +// Tests that a `WeakSender` is upgradeable when other `Sender`s exist. +#[tokio::test] +async fn downgrade_upgrade_sender_success() { + let (tx, _rx) = mpsc::channel::(1); + let weak_tx = tx.downgrade(); + assert!(weak_tx.upgrade().is_some()); +} + +// Tests that a `WeakSender` fails to upgrade when no other `Sender` exists. +#[tokio::test] +async fn downgrade_upgrade_sender_failure() { + let (tx, _rx) = mpsc::channel::(1); + let weak_tx = tx.downgrade(); + drop(tx); + assert!(weak_tx.upgrade().is_none()); +} + +// Tests that a `WeakSender` cannot be upgraded after a `Sender` was dropped, +// which existed at the time of the `downgrade` call. +#[tokio::test] +async fn downgrade_drop_upgrade() { + let (tx, _rx) = mpsc::channel::(1); + + // the cloned `Tx` is dropped right away + let weak_tx = tx.clone().downgrade(); + drop(tx); + assert!(weak_tx.upgrade().is_none()); +} + +// Tests that we can upgrade a weak sender with an outstanding permit +// but no other strong senders. +#[tokio::test] +async fn downgrade_get_permit_upgrade_no_senders() { + let (tx, _rx) = mpsc::channel::(1); + let weak_tx = tx.downgrade(); + let _permit = tx.reserve_owned().await.unwrap(); + assert!(weak_tx.upgrade().is_some()); +} + +// Tests that you can downgrade and upgrade a sender with an outstanding permit +// but no other senders left. +#[tokio::test] +async fn downgrade_upgrade_get_permit_no_senders() { + let (tx, _rx) = mpsc::channel::(1); + let tx2 = tx.clone(); + let _permit = tx.reserve_owned().await.unwrap(); + let weak_tx = tx2.downgrade(); + drop(tx2); + assert!(weak_tx.upgrade().is_some()); +} + +// Tests that `downgrade` does not change the `tx_count` of the channel. +#[tokio::test] +async fn test_tx_count_weak_sender() { + let (tx, _rx) = mpsc::channel::(1); + let tx_weak = tx.downgrade(); + let tx_weak2 = tx.downgrade(); + drop(tx); + + assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none()); +} + +#[tokio::test] +async fn weak_unbounded_sender() { + let (tx, mut rx) = unbounded_channel(); + + let tx_weak = tokio::spawn(async move { + let tx_weak = tx.clone().downgrade(); + + for i in 0..10 { + if tx.send(i).is_err() { + return None; + } + } + + let tx2 = tx_weak + .upgrade() + .expect("expected to be able to upgrade tx_weak"); + let _ = tx2.send(20); + let tx_weak = tx2.downgrade(); + + Some(tx_weak) + }) + .await + .unwrap(); + + for i in 0..12 { + let recvd = rx.recv().await; + + match recvd { + Some(msg) => { + if i == 10 { + assert_eq!(msg, 20); + } + } + None => { + assert_eq!(i, 11); + break; + } + } + } + + let tx_weak = tx_weak.unwrap(); + let upgraded = tx_weak.upgrade(); + assert!(upgraded.is_none()); +} + +#[tokio::test] +async fn actor_weak_unbounded_sender() { + pub struct MyActor { + receiver: mpsc::UnboundedReceiver, + sender: mpsc::WeakUnboundedSender, + next_id: u32, + pub received_self_msg: bool, + } + + enum ActorMessage { + GetUniqueId { respond_to: oneshot::Sender }, + SelfMessage {}, + } + + impl MyActor { + fn new( + receiver: mpsc::UnboundedReceiver, + sender: mpsc::WeakUnboundedSender, + ) -> Self { + MyActor { + receiver, + sender, + next_id: 0, + received_self_msg: false, + } + } + + fn handle_message(&mut self, msg: ActorMessage) { + match msg { + ActorMessage::GetUniqueId { respond_to } => { + self.next_id += 1; + + // The `let _ =` ignores any errors when sending. + // + // This can happen if the `select!` macro is used + // to cancel waiting for the response. + let _ = respond_to.send(self.next_id); + } + ActorMessage::SelfMessage { .. } => { + self.received_self_msg = true; + } + } + } + + async fn send_message_to_self(&mut self) { + let msg = ActorMessage::SelfMessage {}; + + let sender = self.sender.clone(); + + // cannot move self.sender here + if let Some(sender) = sender.upgrade() { + let _ = sender.send(msg); + self.sender = sender.downgrade(); + } + } + + async fn run(&mut self) { + let mut i = 0; + while let Some(msg) = self.receiver.recv().await { + self.handle_message(msg); + + if i == 0 { + self.send_message_to_self().await; + } + + i += 1 + } + + assert!(self.received_self_msg); + } + } + + #[derive(Clone)] + pub struct MyActorHandle { + sender: mpsc::UnboundedSender, + } + + impl MyActorHandle { + pub fn new() -> (Self, MyActor) { + let (sender, receiver) = mpsc::unbounded_channel(); + let actor = MyActor::new(receiver, sender.clone().downgrade()); + + (Self { sender }, actor) + } + + pub async fn get_unique_id(&self) -> u32 { + let (send, recv) = oneshot::channel(); + let msg = ActorMessage::GetUniqueId { respond_to: send }; + + // Ignore send errors. If this send fails, so does the + // recv.await below. There's no reason to check the + // failure twice. + let _ = self.sender.send(msg); + recv.await.expect("Actor task has been killed") + } + } + + let (handle, mut actor) = MyActorHandle::new(); + + let actor_handle = tokio::spawn(async move { actor.run().await }); + + let _ = tokio::spawn(async move { + let _ = handle.get_unique_id().await; + drop(handle); + }) + .await; + + let _ = actor_handle.await; +} + +static NUM_DROPPED_UNBOUNDED: AtomicUsize = AtomicUsize::new(0); + +#[derive(Debug)] +struct MsgUnbounded; + +impl Drop for MsgUnbounded { + fn drop(&mut self) { + NUM_DROPPED_UNBOUNDED.fetch_add(1, Release); + } +} + +// Tests that no pending messages are put onto the channel after `Rx` was +// dropped. +// +// Note: After the introduction of `UnboundedWeakSender`, which internally +// used `Arc` and doesn't call a drop of the channel after the last strong +// `UnboundedSender` was dropped while more than one `UnboundedWeakSender` +// remains, we want to ensure that no messages are kept in the channel, which +// were sent after the receiver was dropped. +#[tokio::test] +async fn test_msgs_dropped_on_unbounded_rx_drop() { + let (tx, mut rx) = mpsc::unbounded_channel(); + + tx.send(MsgUnbounded {}).unwrap(); + tx.send(MsgUnbounded {}).unwrap(); + + // This msg will be pending and should be dropped when `rx` is dropped + let sent = tx.send(MsgUnbounded {}); + + let _ = rx.recv().await.unwrap(); + let _ = rx.recv().await.unwrap(); + + sent.unwrap(); + + drop(rx); + + assert_eq!(NUM_DROPPED_UNBOUNDED.load(Acquire), 3); + + // This msg will not be put onto `Tx` list anymore, since `Rx` is closed. + assert!(tx.send(MsgUnbounded {}).is_err()); + + assert_eq!(NUM_DROPPED_UNBOUNDED.load(Acquire), 4); +} + +// Tests that an `WeakUnboundedSender` is upgradeable when other +// `UnboundedSender`s exist. +#[tokio::test] +async fn downgrade_upgrade_unbounded_sender_success() { + let (tx, _rx) = mpsc::unbounded_channel::(); + let weak_tx = tx.downgrade(); + assert!(weak_tx.upgrade().is_some()); +} + +// Tests that a `WeakUnboundedSender` fails to upgrade when no other +// `UnboundedSender` exists. +#[tokio::test] +async fn downgrade_upgrade_unbounded_sender_failure() { + let (tx, _rx) = mpsc::unbounded_channel::(); + let weak_tx = tx.downgrade(); + drop(tx); + assert!(weak_tx.upgrade().is_none()); +} + +// Tests that an `WeakUnboundedSender` cannot be upgraded after an +// `UnboundedSender` was dropped, which existed at the time of the `downgrade` call. +#[tokio::test] +async fn downgrade_drop_upgrade_unbounded() { + let (tx, _rx) = mpsc::unbounded_channel::(); + + // the cloned `Tx` is dropped right away + let weak_tx = tx.clone().downgrade(); + drop(tx); + assert!(weak_tx.upgrade().is_none()); +} + +// Tests that `downgrade` does not change the `tx_count` of the channel. +#[tokio::test] +async fn test_tx_count_weak_unbounded_sender() { + let (tx, _rx) = mpsc::unbounded_channel::(); + let tx_weak = tx.downgrade(); + let tx_weak2 = tx.downgrade(); + drop(tx); + + assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none()); +}