diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 0d9cd3bc176..3b10cf08ec9 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -691,6 +691,73 @@ impl Drop for Sender { } impl Receiver { + /// Returns the number of messages that were sent into the channel and that + /// this [`Receiver`] has yet to receive. + /// + /// If the returned value from `len` is larger than the next largest power of 2 + /// of the capacity of the channel any call to [`recv`] will return an + /// `Err(RecvError::Lagged)` and any call to [`try_recv`] will return an + /// `Err(TryRecvError::Lagged)`, e.g. if the capacity of the channel is 10, + /// [`recv`] will start to return `Err(RecvError::Lagged)` once `len` returns + /// values larger than 16. + /// + /// [`Receiver`]: crate::sync::broadcast::Receiver + /// [`recv`]: crate::sync::broadcast::Receiver::recv + /// [`try_recv`]: crate::sync::broadcast::Receiver::try_recv + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// + /// tx.send(10).unwrap(); + /// tx.send(20).unwrap(); + /// + /// assert_eq!(rx1.len(), 2); + /// assert_eq!(rx1.recv().await.unwrap(), 10); + /// assert_eq!(rx1.len(), 1); + /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert_eq!(rx1.len(), 0); + /// } + /// ``` + pub fn len(&self) -> usize { + let next_send_pos = self.shared.tail.lock().pos; + (next_send_pos - self.next) as usize + } + + /// Returns true if there aren't any messages in the channel that the [`Receiver`] + /// has yet to receive. + /// + /// [`Receiver]: create::sync::broadcast::Receiver + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// + /// assert!(rx1.is_empty()); + /// + /// tx.send(10).unwrap(); + /// tx.send(20).unwrap(); + /// + /// assert!(!rx1.is_empty()); + /// assert_eq!(rx1.recv().await.unwrap(), 10); + /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert!(rx1.is_empty()); + /// } + /// ``` + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + /// Locks the next value if there is one. fn recv_ref( &mut self, diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 1b68eb7edbd..ca8b4d7f4ce 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -457,6 +457,25 @@ fn lagging_receiver_recovers_after_wrap_open() { assert_empty!(rx); } +#[test] +fn receiver_len_with_lagged() { + let (tx, mut rx) = broadcast::channel(3); + + tx.send(10).unwrap(); + tx.send(20).unwrap(); + tx.send(30).unwrap(); + tx.send(40).unwrap(); + + assert_eq!(rx.len(), 4); + assert_eq!(assert_recv!(rx), 10); + + tx.send(50).unwrap(); + tx.send(60).unwrap(); + + assert_eq!(rx.len(), 5); + assert_lagged!(rx.try_recv(), 1); +} + fn is_closed(err: broadcast::error::RecvError) -> bool { matches!(err, broadcast::error::RecvError::Closed) }