Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add broadcast::Sender::len #5343

Merged
merged 6 commits into from Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 93 additions & 2 deletions tokio/src/sync/broadcast.rs
Expand Up @@ -603,6 +603,97 @@ impl<T> Sender<T> {
new_receiver(shared)
}

/// Returns the number of queued values.
sfackler marked this conversation as resolved.
Show resolved Hide resolved
///
/// A value is queued until it has either been seen by all receivers that were alive at the time
/// it was sent, or has been evicted from the queue by subsequent sends that exceeded the
/// queue's capacity.
///
/// # Note
///
/// In contrast to [`Receiver::len`], this method only reports queued values and not values that
/// have been evicted from the queue before being seen by all receivers.
///
/// # Examples
///
/// ```
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx1) = broadcast::channel(16);
/// let mut rx2 = tx.subscribe();
///
/// tx.send(10).unwrap();
/// tx.send(20).unwrap();
/// tx.send(30).unwrap();
///
/// assert_eq!(tx.len(), 3);
///
/// rx1.recv().await.unwrap();
///
/// // The len is still 3 since rx2 hasn't seen the first value yet.
/// assert_eq!(tx.len(), 3);
///
/// rx2.recv().await.unwrap();
///
/// assert_eq!(tx.len(), 2);
/// }
/// ```
pub fn len(&self) -> usize {
let tail = self.shared.tail.lock();

let base_idx = (tail.pos & self.shared.mask as u64) as usize;
let mut low = 0;
let mut high = self.shared.buffer.len();
while low < high {
let mid = low + (high - low) / 2;
let idx = base_idx.wrapping_add(mid) & self.shared.mask;
if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can probably be a relaxed load rather than sequentially consistent, but haven't thought too carefully about it.

low = mid + 1;
} else {
high = mid;
}
}

self.shared.buffer.len() - low
}

/// Returns true if there are no queued values.
///
/// # Examples
///
/// ```
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx1) = broadcast::channel(16);
/// let mut rx2 = tx.subscribe();
///
/// assert!(tx.is_empty());
///
/// tx.send(10).unwrap();
///
/// assert!(!tx.is_empty());
///
/// rx1.recv().await.unwrap();
///
/// // The queue is still not empty since rx2 hasn't seen the value.
/// assert!(!tx.is_empty());
///
/// rx2.recv().await.unwrap();
///
/// assert!(tx.is_empty());
/// }
/// ```
pub fn is_empty(&self) -> bool {
let tail = self.shared.tail.lock();

let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0
}

/// Returns the number of active receivers
///
/// An active receiver is a [`Receiver`] handle returned from [`channel`] or
Expand Down Expand Up @@ -731,7 +822,7 @@ impl<T> Receiver<T> {
/// assert_eq!(rx1.len(), 2);
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// assert_eq!(rx1.len(), 1);
/// assert_eq!(rx1.recv().await.unwrap(), 20);
/// assert_eq!(rx1.recv().await.unwrap(), 20);
/// assert_eq!(rx1.len(), 0);
/// }
/// ```
Expand Down Expand Up @@ -761,7 +852,7 @@ impl<T> Receiver<T> {
///
/// assert!(!rx1.is_empty());
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// assert_eq!(rx1.recv().await.unwrap(), 20);
/// assert_eq!(rx1.recv().await.unwrap(), 20);
/// assert!(rx1.is_empty());
/// }
/// ```
Expand Down
60 changes: 60 additions & 0 deletions tokio/tests/sync_broadcast.rs
Expand Up @@ -526,3 +526,63 @@ fn resubscribe_to_closed_channel() {
let mut rx_resub = rx.resubscribe();
assert_closed!(rx_resub.try_recv());
}

#[test]
fn sender_len() {
let (tx, mut rx1) = broadcast::channel(4);
let mut rx2 = tx.subscribe();

assert_eq!(tx.len(), 0);
assert!(tx.is_empty());

tx.send(1).unwrap();
tx.send(2).unwrap();
tx.send(3).unwrap();

assert_eq!(tx.len(), 3);
assert!(!tx.is_empty());

assert_recv!(rx1);
assert_recv!(rx1);

assert_eq!(tx.len(), 3);
assert!(!tx.is_empty());

assert_recv!(rx2);

assert_eq!(tx.len(), 2);
assert!(!tx.is_empty());

tx.send(4).unwrap();
tx.send(5).unwrap();
tx.send(6).unwrap();

assert_eq!(tx.len(), 4);
assert!(!tx.is_empty());
}

#[test]
#[cfg(not(tokio_wasm_not_wasi))]
fn sender_len_random() {
use rand::Rng;

let (tx, mut rx1) = broadcast::channel(16);
let mut rx2 = tx.subscribe();

for _ in 0..1000 {
match rand::thread_rng().gen_range(0..4) {
0 => {
let _ = rx1.try_recv();
}
1 => {
let _ = rx2.try_recv();
}
_ => {
tx.send(0).unwrap();
}
}

let expected_len = usize::min(usize::max(rx1.len(), rx2.len()), 16);
assert_eq!(tx.len(), expected_len);
}
}