From 97de03edffe6337399bd4172072e4932b70b91d1 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Fri, 8 Apr 2022 11:51:57 -0700 Subject: [PATCH] Add subscribe method to broadcast::Receiver --- tokio/src/sync/broadcast.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index e54fe5c8151..e34b64a906d 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -552,7 +552,7 @@ impl Sender { /// ``` pub fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); - new_receiver(shared) + new_receiver(shared, None) } /// Returns the number of active receivers @@ -642,7 +642,8 @@ impl Sender { } } -fn new_receiver(shared: Arc>) -> Receiver { +/// Create a new `Receiver` which reads starting from the tail if `next_pos` is not specified. +fn new_receiver(shared: Arc>, next_pos: Option) -> Receiver { let mut tail = shared.tail.lock(); if tail.rx_cnt == MAX_RECEIVERS { @@ -651,10 +652,9 @@ fn new_receiver(shared: Arc>) -> Receiver { tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); - let next = tail.pos; + let next = next_pos.unwrap_or(tail.pos); drop(tail); - Receiver { shared, next } } @@ -1022,6 +1022,13 @@ impl Drop for Receiver { } } +impl Clone for Receiver { + fn clone(&self) -> Self { + let shared = self.shared.clone(); + new_receiver(shared, Some(self.next)) + } +} + impl<'a, T> Recv<'a, T> { fn new(receiver: &'a mut Receiver) -> Recv<'a, T> { Recv {