Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a method on Receiver that gives information on queue status in broadcast channel #4542

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 29 additions & 0 deletions tokio/src/sync/broadcast.rs
Expand Up @@ -691,6 +691,35 @@ impl<T> Drop for Sender<T> {
}

impl<T> Receiver<T> {
/// Returns the number of messages that were sent into the channel and that
/// this [`Receiver`] has yet to receive.
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
///
/// [`Receiver`]: crate::sync::broadcast::Receiver
///
/// # Examples
///
/// ```
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx1) = broadcast::channel(16);
///
/// tx.send(10).unwrap();
/// tx.send(20).unwrap();
///
/// assert_eq!(rx1.num_msgs(), 2);
/// assert_eq!(rx1.recv().await.unwrap(), 10);
/// assert_eq!(rx1.num_msgs(), 1);
/// assert_eq!(rx1.recv().await.unwrap(), 20);
/// assert_eq!(rx1.num_msgs(), 0);
/// }
/// ```
pub fn num_msgs(&self) -> u64 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is num_msgs the right name for this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, but I think it's clearer than len (which is mostly used for container like data structures). Plus clippy complained that this missed an empty method given that it had a len method, where again I'm not sure it makes sense to speak of a Receiver as being empty, though some people might like that interface. num_msgs is probably not great either, I was thinking of something like num_queued_messages, but thought it might have been too verbose. I'm open to choose whatever you think is the most plausible choice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I posted this question for discussion on #tokio-dev on discord.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be inclined to just call this len --- crossbeam::channel has a len method, and I think the meaning of referring to a channel's "length" is relatively unambiguous. Alternatively, if we don't want to call it a "length", i would suggest num_queued --- I don't think adding "messages" to the method name conveys as much information...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is u64 the right return type here? The usize type seems more reasonable to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose that because tail.pos has that type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, the capacity argument to the constructor for the channel takes an usize.

(The reason that tail.pos has the type u64 is that it needs to fit the total number of messages that will be sent on the channel, not just the total number of messages in-flight at any one time.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be usize.

let next_send_pos = self.shared.tail.lock().pos;
next_send_pos - self.next
}

/// Locks the next value if there is one.
fn recv_ref(
&mut self,
Expand Down