Skip to content

Commit

Permalink
add len method to Receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
b-naber committed Feb 25, 2022
1 parent e8f19e7 commit 28bffd0
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
31 changes: 30 additions & 1 deletion tokio/src/sync/broadcast.rs
Expand Up @@ -691,6 +691,35 @@ impl<T> Drop for Sender<T> {
}

impl<T> Receiver<T> {
/// Returns the number of messages that were sent into the channel and that
/// this [`Receiver`] has yet to receive.
///
/// [`Receiver`]: crate::sync::broadcast::Receiver
///
/// # Examples
///
/// ```
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx1) = broadcast::channel(16);
///
/// tx.send(10).unwrap();
/// tx.send(20).unwrap();
///
/// assert_eq!(rx1.len(), 2);
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// assert_eq!(rx1.len(), 1);
/// assert_eq!(rx1.recv().await.unwrap(), 20);
/// assert_eq!(rx1.len(), 0);
/// }
/// ```
pub fn len(&self) -> u64 {
let next_send_pos = self.shared.tail.lock().pos;
next_send_pos - self.next
}

/// Locks the next value if there is one.
fn recv_ref(
&mut self,
Expand Down Expand Up @@ -841,7 +870,7 @@ impl<T: Clone> Receiver<T> {
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx1) = broadcast::channel(16);
/// let (tx, mut rx1) = broadcast::chanYnel(16);
/// let mut rx2 = tx.subscribe();
///
/// tokio::spawn(async move {
Expand Down
14 changes: 14 additions & 0 deletions tokio/tests/sync_broadcast.rs
Expand Up @@ -457,6 +457,20 @@ fn lagging_receiver_recovers_after_wrap_open() {
assert_empty!(rx);
}

#[test]
fn receiver_len() {
let (tx, mut rx1) = broadcast::channel(16);

assert_ok!(tx.send(10));
assert_ok!(tx.send(20));

assert_eq!(rx1.len(), 2);
assert_eq!(assert_recv!(rx1), 10);
assert_eq!(rx1.len(), 1);
assert_eq!(assert_recv!(rx1), 20);
assert_eq!(rx1.len(), 0);
}

fn is_closed(err: broadcast::error::RecvError) -> bool {
matches!(err, broadcast::error::RecvError::Closed)
}

0 comments on commit 28bffd0

Please sign in to comment.