Skip to content

Commit

Permalink
Add subscribe method to broadcast::Receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Simmons authored and estk committed Apr 10, 2022
1 parent 83477c7 commit 97de03e
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions tokio/src/sync/broadcast.rs
Expand Up @@ -552,7 +552,7 @@ impl<T> Sender<T> {
/// ```
pub fn subscribe(&self) -> Receiver<T> {
let shared = self.shared.clone();
new_receiver(shared)
new_receiver(shared, None)
}

/// Returns the number of active receivers
Expand Down Expand Up @@ -642,7 +642,8 @@ impl<T> Sender<T> {
}
}

fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {
/// Create a new `Receiver` which reads starting from the tail if `next_pos` is not specified.
fn new_receiver<T>(shared: Arc<Shared<T>>, next_pos: Option<u64>) -> Receiver<T> {
let mut tail = shared.tail.lock();

if tail.rx_cnt == MAX_RECEIVERS {
Expand All @@ -651,10 +652,9 @@ fn new_receiver<T>(shared: Arc<Shared<T>>) -> Receiver<T> {

tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");

let next = tail.pos;
let next = next_pos.unwrap_or(tail.pos);

drop(tail);

Receiver { shared, next }
}

Expand Down Expand Up @@ -1022,6 +1022,13 @@ impl<T> Drop for Receiver<T> {
}
}

impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let shared = self.shared.clone();
new_receiver(shared, Some(self.next))
}
}

impl<'a, T> Recv<'a, T> {
fn new(receiver: &'a mut Receiver<T>) -> Recv<'a, T> {
Recv {
Expand Down

0 comments on commit 97de03e

Please sign in to comment.