diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index fa439fba500..c74633cded7 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -694,7 +694,13 @@ impl Receiver { /// Returns the number of messages that were sent into the channel and that /// this [`Receiver`] has yet to receive. /// + /// If the returned value from `len` is larger or equal to the capacity of + /// the channel any call to [`recv`] will return an `Err(RecvError::Lagged)` + /// and any call to [`try_recv`] will return an `Err(TryRecvError::Lagged)`. + /// /// [`Receiver`]: crate::sync::broadcast::Receiver + /// [`recv`]: crate::sync::broadcast::Receiver::recv + /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv /// /// # Examples /// @@ -708,16 +714,45 @@ impl Receiver { /// tx.send(10).unwrap(); /// tx.send(20).unwrap(); /// - /// assert_eq!(rx1.num_msgs(), 2); + /// assert_eq!(rx1.len(), 2); /// assert_eq!(rx1.recv().await.unwrap(), 10); - /// assert_eq!(rx1.num_msgs(), 1); + /// assert_eq!(rx1.len(), 1); /// assert_eq!(rx1.recv().await.unwrap(), 20); - /// assert_eq!(rx1.num_msgs(), 0); + /// assert_eq!(rx1.len(), 0); /// } /// ``` - pub fn num_msgs(&self) -> u64 { + pub fn len(&self) -> usize { let next_send_pos = self.shared.tail.lock().pos; - next_send_pos - self.next + (next_send_pos - self.next) as usize + } + + /// Returns true if there aren't any messages in the channel that the [`Receiver`] + /// has yet to receive. + /// + /// [`Receiver]: create::sync::broadcast::Receiver + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// + /// assert!(rx1.is_empty()); + /// + /// tx.send(10).unwrap(); + /// tx.send(20).unwrap(); + /// + /// assert!(!rx1.is_empty()); + /// assert_eq!(rx1.recv().await.unwrap(), 10); + /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert!(rx.is_empty()); + /// } + /// ``` + pub fn is_empty(&self) -> bool { + self.len() == 0 } /// Locks the next value if there is one.