From b36ce2544ba05b8f514ddd7c1ad4ae5030af5052 Mon Sep 17 00:00:00 2001 From: b-naber Date: Fri, 25 Feb 2022 18:36:19 +0100 Subject: [PATCH 1/3] add len method to Receiver --- tokio/src/sync/broadcast.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 0d9cd3bc176..fa439fba500 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -691,6 +691,35 @@ impl Drop for Sender { } impl Receiver { + /// Returns the number of messages that were sent into the channel and that + /// this [`Receiver`] has yet to receive. + /// + /// [`Receiver`]: crate::sync::broadcast::Receiver + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// + /// tx.send(10).unwrap(); + /// tx.send(20).unwrap(); + /// + /// assert_eq!(rx1.num_msgs(), 2); + /// assert_eq!(rx1.recv().await.unwrap(), 10); + /// assert_eq!(rx1.num_msgs(), 1); + /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert_eq!(rx1.num_msgs(), 0); + /// } + /// ``` + pub fn num_msgs(&self) -> u64 { + let next_send_pos = self.shared.tail.lock().pos; + next_send_pos - self.next + } + /// Locks the next value if there is one. fn recv_ref( &mut self, From 841963ffc213d45c8028e11ff67bd3464e069caa Mon Sep 17 00:00:00 2001 From: b-naber Date: Thu, 3 Mar 2022 16:34:21 +0100 Subject: [PATCH 2/3] change name to len, add empty method and update docs --- tokio/src/sync/broadcast.rs | 45 ++++++++++++++++++++++++++++++++----- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index fa439fba500..88249826472 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -694,7 +694,13 @@ impl Receiver { /// Returns the number of messages that were sent into the channel and that /// this [`Receiver`] has yet to receive. /// + /// If the returned value from `len` is larger or equal to the capacity of + /// the channel any call to [`recv`] will return an `Err(RecvError::Lagged)` + /// and any call to [`try_recv`] will return an `Err(TryRecvError::Lagged)`. + /// /// [`Receiver`]: crate::sync::broadcast::Receiver + /// [`recv`]: crate::sync::broadcast::Receiver::recv + /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv /// /// # Examples /// @@ -708,16 +714,45 @@ impl Receiver { /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// - /// assert_eq!(rx1.num_msgs(), 2); + /// assert_eq!(rx1.len(), 2); /// assert_eq!(rx1.recv().await.unwrap(), 10); - /// assert_eq!(rx1.num_msgs(), 1); + /// assert_eq!(rx1.len(), 1); /// assert_eq!(rx1.recv().await.unwrap(), 20); - /// assert_eq!(rx1.num_msgs(), 0); + /// assert_eq!(rx1.len(), 0); /// } /// ``` - pub fn num_msgs(&self) -> u64 { + pub fn len(&self) -> usize { let next_send_pos = self.shared.tail.lock().pos; - next_send_pos - self.next + (next_send_pos - self.next) as usize + } + + /// Returns true if there aren't any messages in the channel that the [`Receiver`] + /// has yet to receive. + /// + /// [`Receiver]: create::sync::broadcast::Receiver + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// + /// assert!(rx1.is_empty()); + /// + /// tx.send(10).unwrap(); + /// tx.send(20).unwrap(); + /// + /// assert!(!rx1.is_empty()); + /// assert_eq!(rx1.recv().await.unwrap(), 10); + /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert!(rx1.is_empty()); + /// } + /// ``` + pub fn is_empty(&self) -> bool { + self.len() == 0 } /// Locks the next value if there is one. From bb10d771faf8c44d613eabc2f0a234cc3026f451 Mon Sep 17 00:00:00 2001 From: b-naber Date: Thu, 3 Mar 2022 19:11:17 +0100 Subject: [PATCH 3/3] update docs, add test --- tokio/src/sync/broadcast.rs | 9 ++++++--- tokio/tests/sync_broadcast.rs | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 88249826472..3b10cf08ec9 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -694,9 +694,12 @@ impl Receiver { /// Returns the number of messages that were sent into the channel and that /// this [`Receiver`] has yet to receive. /// - /// If the returned value from `len` is larger or equal to the capacity of - /// the channel any call to [`recv`] will return an `Err(RecvError::Lagged)` - /// and any call to [`try_recv`] will return an `Err(TryRecvError::Lagged)`. + /// If the returned value from `len` is larger than the next largest power of 2 + /// of the capacity of the channel any call to [`recv`] will return an + /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an + /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10, + /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns + /// values larger than 16. /// /// [`Receiver`]: crate::sync::broadcast::Receiver /// [`recv`]: crate::sync::broadcast::Receiver::recv diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 1b68eb7edbd..ca8b4d7f4ce 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -457,6 +457,25 @@ fn lagging_receiver_recovers_after_wrap_open() { assert_empty!(rx); } +#[test] +fn receiver_len_with_lagged() { + let (tx, mut rx) = broadcast::channel(3); + + tx.send(10).unwrap(); + tx.send(20).unwrap(); + tx.send(30).unwrap(); + tx.send(40).unwrap(); + + assert_eq!(rx.len(), 4); + assert_eq!(assert_recv!(rx), 10); + + tx.send(50).unwrap(); + tx.send(60).unwrap(); + + assert_eq!(rx.len(), 5); + assert_lagged!(rx.try_recv(), 1); +} + fn is_closed(err: broadcast::error::RecvError) -> bool { matches!(err, broadcast::error::RecvError::Closed) }