From 28bffd005419204cdf17784c93a89d4683e3d08c Mon Sep 17 00:00:00 2001 From: b-naber Date: Fri, 25 Feb 2022 18:36:19 +0100 Subject: [PATCH] add len method to Receiver --- tokio/src/sync/broadcast.rs | 31 ++++++++++++++++++++++++++++++- tokio/tests/sync_broadcast.rs | 14 ++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 0d9cd3bc176..d766be44266 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -691,6 +691,35 @@ impl Drop for Sender { } impl Receiver { + /// Returns the number of messages that were sent into the channel and that + /// this [`Receiver`] has yet to receive. + /// + /// [`Receiver`]: crate::sync::broadcast::Receiver + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// + /// tx.send(10).unwrap(); + /// tx.send(20).unwrap(); + /// + /// assert_eq!(rx1.len(), 2); + /// assert_eq!(rx1.recv().await.unwrap(), 10); + /// assert_eq!(rx1.len(), 1); + /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert_eq!(rx1.len(), 0); + /// } + /// ``` + pub fn len(&self) -> u64 { + let next_send_pos = self.shared.tail.lock().pos; + next_send_pos - self.next + } + /// Locks the next value if there is one. fn recv_ref( &mut self, @@ -841,7 +870,7 @@ impl Receiver { /// /// #[tokio::main] /// async fn main() { - /// let (tx, mut rx1) = broadcast::channel(16); + /// let (tx, mut rx1) = broadcast::chanYnel(16); /// let mut rx2 = tx.subscribe(); /// /// tokio::spawn(async move { diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 1b68eb7edbd..a84c42c662b 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -457,6 +457,20 @@ fn lagging_receiver_recovers_after_wrap_open() { assert_empty!(rx); } +#[test] +fn receiver_len() { + let (tx, mut rx1) = broadcast::channel(16); + + assert_ok!(tx.send(10)); + assert_ok!(tx.send(20)); + + assert_eq!(rx1.len(), 2); + assert_eq!(assert_recv!(rx1), 10); + assert_eq!(rx1.len(), 1); + assert_eq!(assert_recv!(rx1), 20); + assert_eq!(rx1.len(), 0); +} + fn is_closed(err: broadcast::error::RecvError) -> bool { matches!(err, broadcast::error::RecvError::Closed) }