Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add resubscribe functionality to broadcast::Receiver #4607

Merged
merged 13 commits into from May 28, 2022
28 changes: 28 additions & 0 deletions tokio/src/sync/broadcast.rs
Expand Up @@ -647,6 +647,7 @@ impl<T> Sender<T> {
}
}

/// Create a new `Receiver` which reads starting from the tail.
fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
let mut tail = shared.tail.lock();

Expand Down Expand Up @@ -881,6 +882,33 @@ impl<T> Receiver<T> {
}

impl<T: Clone> Receiver<T> {
/// Re-subscribes to the channel starting from the current tail element.
///
/// This [`Receiver`] handle will receive a clone of all values sent
/// **after** it has resubscribed. This will not include elements that are
/// in the queue of the current receiver. Consider the following example.
///
/// # Examples
///
/// ```
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = broadcast::channel(2);
///
/// tx.send(1).unwrap();
/// let mut rx2 = rx.resubscribe();
/// tx.send(2).unwrap();
///
/// assert_eq!(rx2.recv().await.unwrap(), 2);
/// assert_eq!(rx.recv().await.unwrap(), 1);
/// }
/// ```
pub fn resubscribe(&self) -> Self {
let shared = self.shared.clone();
new_receiver(shared)
}
/// Receives the next value for this receiver.
///
/// Each [`Receiver`] handle will receive a clone of all values sent
Expand Down
38 changes: 38 additions & 0 deletions tokio/tests/sync_broadcast.rs
Expand Up @@ -479,3 +479,41 @@ fn receiver_len_with_lagged() {
fn is_closed(err: broadcast::error::RecvError) -> bool {
matches!(err, broadcast::error::RecvError::Closed)
}

#[test]
fn resubscribe_points_to_tail() {
let (tx, mut rx) = broadcast::channel(3);
tx.send(1).unwrap();

let mut rx_resub = rx.resubscribe();

// verify we're one behind at the start
assert_empty!(rx_resub);
assert_eq!(assert_recv!(rx), 1);

// verify we do not affect rx
tx.send(2).unwrap();
assert_eq!(assert_recv!(rx_resub), 2);
tx.send(3).unwrap();
assert_eq!(assert_recv!(rx), 2);
assert_eq!(assert_recv!(rx), 3);
assert_empty!(rx);

assert_eq!(assert_recv!(rx_resub), 3);
assert_empty!(rx_resub);
}

#[test]
fn resubscribe_lagged() {
let (tx, mut rx) = broadcast::channel(1);
tx.send(1).unwrap();
tx.send(2).unwrap();

let mut rx_resub = rx.resubscribe();
assert_lagged!(rx.try_recv(), 1);
assert_empty!(rx_resub);

assert_eq!(assert_recv!(rx), 2);
assert_empty!(rx);
assert_empty!(rx_resub);
}