From 7aae066d033a3358c1bd7a8c5dc65d961cedb249 Mon Sep 17 00:00:00 2001 From: b-naber Date: Fri, 25 Feb 2022 18:36:19 +0100 Subject: [PATCH] add len method to Receiver --- tokio/src/sync/broadcast.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 0d9cd3bc176..3caded3f0ee 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -691,6 +691,35 @@ impl Drop for Sender { } impl Receiver { + /// Returns the number of messages that were sent into the channel and that + /// this [`Receiver`] has yet to receive. + /// + /// [`Receiver`]: crate::sync::broadcast::Receiver + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx1) = broadcast::channel(16); + /// + /// tx.send(10).unwrap(); + /// tx.send(20).unwrap(); + /// + /// assert_eq!(rx1.len(), 2); + /// assert_eq!(rx1.recv().await.unwrap(), 10); + /// assert_eq!(rx1.len(), 1); + /// assert_eq!(rx1.recv().await.unwrap(), 20); + /// assert_eq!(rx1.len(), 0); + /// } + /// ``` + pub fn len(&self) -> u64 { + let next_send_pos = self.shared.tail.lock().pos; + next_send_pos - self.next + } + /// Locks the next value if there is one. fn recv_ref( &mut self,