From 50b1bbdf1ff1dcb17b97f9770cf52b237a075d55 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Fri, 8 Apr 2022 11:51:57 -0700 Subject: [PATCH 01/13] Add subscribe method to broadcast::Receiver --- tokio/src/sync/broadcast.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 846d6c027a6..1efa08b0d15 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -557,7 +557,7 @@ impl Sender { /// ``` pub fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); - new_receiver(shared) + new_receiver(shared, None) } /// Returns the number of active receivers @@ -647,7 +647,8 @@ impl Sender { } } -fn new_receiver(shared: Arc>) -> Receiver { +/// Create a new `Receiver` which reads starting from the tail if `next_pos` is not specified. +fn new_receiver(shared: Arc>, next_pos: Option) -> Receiver { let mut tail = shared.tail.lock(); if tail.rx_cnt == MAX_RECEIVERS { @@ -656,10 +657,9 @@ fn new_receiver(shared: Arc>) -> Receiver { tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); - let next = tail.pos; + let next = next_pos.unwrap_or(tail.pos); drop(tail); - Receiver { shared, next } } @@ -1027,6 +1027,13 @@ impl Drop for Receiver { } } +impl Clone for Receiver { + fn clone(&self) -> Self { + let shared = self.shared.clone(); + new_receiver(shared, Some(self.next)) + } +} + impl<'a, T> Recv<'a, T> { fn new(receiver: &'a mut Receiver) -> Recv<'a, T> { Recv { From 1b9cc181c697189a2f24271bae5509b43006e3f0 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Sun, 10 Apr 2022 12:36:55 -0700 Subject: [PATCH 02/13] add a test for broadcast::Receiver Clone impl --- tokio/tests/sync_broadcast.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index ca8b4d7f4ce..31617564946 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -479,3 +479,28 @@ fn receiver_len_with_lagged() { fn is_closed(err: broadcast::error::RecvError) -> bool { matches!(err, broadcast::error::RecvError::Closed) } + +#[test] +fn receiver_clone_same_position_as_cloned() { + let (tx, mut rx) = broadcast::channel(2); + let mut other_rx = tx.subscribe(); + + assert_ok!(tx.send(1)); + assert_ok!(tx.send(2)); + + assert_eq!(assert_recv!(rx), 1); + + assert_eq!(tx.receiver_count(), 2); + let mut rx_clone = rx.clone(); + + // verify rx count is incremented + assert_eq!(tx.receiver_count(), 3); + + // rx and rx_clone should have the same next element at clone-time. + assert_eq!(assert_recv!(rx), 2); + assert_eq!(assert_recv!(rx_clone), 2); + assert_empty!(rx); + assert_empty!(rx_clone); + + assert_eq!(assert_recv!(other_rx), 1); +} From 0b0dc16bcb608072a9e71871073f638844185ad8 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Mon, 18 Apr 2022 09:40:56 -0700 Subject: [PATCH 03/13] register interest in slots, add tests that failed before doing so --- tokio/src/sync/broadcast.rs | 26 +++++++++- tokio/src/sync/tests/loom_broadcast.rs | 67 ++++++++++++++++++++++++++ tokio/tests/sync_broadcast.rs | 45 ++++++++++++----- 3 files changed, 126 insertions(+), 12 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 1efa08b0d15..f1ba66a2f69 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1029,8 +1029,32 @@ impl Drop for Receiver { impl Clone for Receiver { fn clone(&self) -> Self { + let next = self.next; + // register interest in the slot that next points to + { + for n in next..=self.shared.tail.lock().pos { + let idx = (n & self.shared.mask as u64) as usize; + let slot = self.shared.buffer[idx].read().unwrap(); + + // a race with RecvGuard::drop would be bad, but is impossible since `self.next` + // is already incremented to the slot after the one that the `RecvGuard` points to. Additionally + // all methods that drop a `RecvGuard` require a &mut `Receiver` which ensures this method is not + // called concurrently. + slot.rem.fetch_add(1, SeqCst); + } + } let shared = self.shared.clone(); - new_receiver(shared, Some(self.next)) + // register the new receiver with `Tail` + { + let mut tail = shared.tail.lock(); + + if tail.rx_cnt == MAX_RECEIVERS { + panic!("max receivers"); + } + tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); + } + + Receiver { shared, next } } } diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs index 039b01bf436..3e875667416 100644 --- a/tokio/src/sync/tests/loom_broadcast.rs +++ b/tokio/src/sync/tests/loom_broadcast.rs @@ -178,6 +178,46 @@ fn drop_rx() { assert_ok!(th2.join()); }); } +#[test] +fn drop_cloned_rx() { + loom::model(|| { + let (tx, mut rx1) = broadcast::channel(16); + let rx2 = rx1.clone(); + + let th1 = thread::spawn(move || { + block_on(async { + let v = assert_ok!(rx1.recv().await); + assert_eq!(v, "one"); + + let v = assert_ok!(rx1.recv().await); + assert_eq!(v, "two"); + + let v = assert_ok!(rx1.recv().await); + assert_eq!(v, "three"); + + let v = assert_ok!(rx1.recv().await); + + match assert_err!(rx1.recv().await) { + Closed => {} + _ => panic!(), + } + }); + }); + + let th2 = thread::spawn(move || { + drop(rx2); + }); + + assert_ok!(tx.send("one")); + assert_ok!(tx.send("two")); + assert_ok!(tx.send("three")); + drop(tx); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + }); +} + #[test] fn drop_multiple_rx_with_overflow() { @@ -205,3 +245,30 @@ fn drop_multiple_rx_with_overflow() { assert_ok!(th2.join()); }); } + +#[test] +fn drop_multiple_cloned_rx_with_overflow() { + loom::model(move || { + // It is essential to have multiple senders and receivers in this test case. + let (tx, mut rx) = broadcast::channel(1); + let _rx2 = rx.clone(); + + let _ = tx.send(()); + let tx2 = tx.clone(); + let th1 = thread::spawn(move || { + block_on(async { + for _ in 0..100 { + let _ = tx2.send(()); + } + }); + }); + let _ = tx.send(()); + + let th2 = thread::spawn(move || { + block_on(async { while let Ok(_) = rx.recv().await {} }); + }); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + }); +} diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 31617564946..b4728e421cc 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -481,26 +481,49 @@ fn is_closed(err: broadcast::error::RecvError) -> bool { } #[test] -fn receiver_clone_same_position_as_cloned() { - let (tx, mut rx) = broadcast::channel(2); - let mut other_rx = tx.subscribe(); +fn receiver_same_position_as_cloned() { + let (tx, mut rx) = broadcast::channel(3); + + let mut rx_clone = rx.clone(); + // verify rx count is incremented + assert_eq!(tx.receiver_count(), 2); + // verify ok to start recv on rx_clone before rx assert_ok!(tx.send(1)); + assert_eq!(assert_recv!(rx_clone), 1); + + drop(rx_clone); assert_ok!(tx.send(2)); + assert_ok!(tx.send(3)); + // rx: [1,2,3] + // verify ok to start recv on rx before rx_clone + let mut rx_clone = rx.clone(); assert_eq!(assert_recv!(rx), 1); + assert_eq!(assert_recv!(rx_clone), 1); + // as we drop the rx_clone we should have registered interest in all positions between Receiver::next and tail.pos so each are decremented when we recv them. + drop(rx_clone); - assert_eq!(tx.receiver_count(), 2); + // rx: [2, 3, _] + assert_ok!(tx.send(4)); + // rx: [2, 3, 4] let mut rx_clone = rx.clone(); - // verify rx count is incremented - assert_eq!(tx.receiver_count(), 3); - - // rx and rx_clone should have the same next element at clone-time. + // verify interest registered in slot, if not 3 and 4 is dropped and will rx_clone will not recv them. assert_eq!(assert_recv!(rx), 2); + assert_eq!(assert_recv!(rx), 3); + + // rx: [4, _, _] + // rx_clone: [2, 3, 4] assert_eq!(assert_recv!(rx_clone), 2); - assert_empty!(rx); - assert_empty!(rx_clone); + assert_eq!(assert_recv!(rx_clone), 3); + assert_eq!(assert_recv!(rx_clone), 4); + assert_eq!(assert_recv!(rx), 4); + assert_ok!(tx.send(5)); + drop(tx); + assert_eq!(assert_recv!(rx), 5); + assert_eq!(assert_recv!(rx_clone), 5); - assert_eq!(assert_recv!(other_rx), 1); + assert_closed!(rx.try_recv()); + assert_closed!(rx_clone.try_recv()); } From a740fa7c0bd1210eced82a4c9470a20ff8f6c3cf Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Mon, 18 Apr 2022 11:01:03 -0700 Subject: [PATCH 04/13] mutex for send2 --- tokio/src/sync/broadcast.rs | 47 +++++++++++++++----------- tokio/src/sync/tests/loom_broadcast.rs | 26 +++++++++++++- 2 files changed, 53 insertions(+), 20 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index f1ba66a2f69..3df46fbd9e7 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1030,29 +1030,38 @@ impl Drop for Receiver { impl Clone for Receiver { fn clone(&self) -> Self { let next = self.next; + let shared = self.shared.clone(); + // register interest in the slot that next points to - { - for n in next..=self.shared.tail.lock().pos { - let idx = (n & self.shared.mask as u64) as usize; - let slot = self.shared.buffer[idx].read().unwrap(); - - // a race with RecvGuard::drop would be bad, but is impossible since `self.next` - // is already incremented to the slot after the one that the `RecvGuard` points to. Additionally - // all methods that drop a `RecvGuard` require a &mut `Receiver` which ensures this method is not - // called concurrently. - slot.rem.fetch_add(1, SeqCst); - } + // let this be lock-free since we're not yet operating on the tail. + let tail_pos = shared.tail.lock().pos; + for n in next..tail_pos { + let idx = (n & shared.mask as u64) as usize; + let slot = shared.buffer[idx].read().unwrap(); + + // a race with RecvGuard::drop would be bad, but is impossible since `self.next` + // is already incremented to the slot after the one that the `RecvGuard` points to. Additionally + // all methods that drop a `RecvGuard` require a &mut `Receiver` which ensures this method is not + // called concurrently. + slot.rem.fetch_add(1, SeqCst); + } + // tail pos may have changed, we need a locked section here to prevent a race with `Sender::send2` + let mut n = tail_pos.wrapping_sub(1); + let mut tail = shared.tail.lock(); + while n <= tail.pos { + let idx = (n & shared.mask as u64) as usize; + let slot = self.shared.buffer[idx].read().unwrap(); + slot.rem.fetch_add(1, SeqCst); + n = n.wrapping_add(1); } - let shared = self.shared.clone(); - // register the new receiver with `Tail` - { - let mut tail = shared.tail.lock(); - if tail.rx_cnt == MAX_RECEIVERS { - panic!("max receivers"); - } - tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); + // register the new receiver with `Tail` + if tail.rx_cnt == MAX_RECEIVERS { + panic!("max receivers"); } + tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); + + drop(tail); Receiver { shared, next } } diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs index 3e875667416..088b151c6ce 100644 --- a/tokio/src/sync/tests/loom_broadcast.rs +++ b/tokio/src/sync/tests/loom_broadcast.rs @@ -195,7 +195,6 @@ fn drop_cloned_rx() { let v = assert_ok!(rx1.recv().await); assert_eq!(v, "three"); - let v = assert_ok!(rx1.recv().await); match assert_err!(rx1.recv().await) { Closed => {} @@ -272,3 +271,28 @@ fn drop_multiple_cloned_rx_with_overflow() { assert_ok!(th2.join()); }); } + +#[test] +fn send_and_rx_clone() { + // test the interraction of Sender::send and Rx::clone + loom::model(move || { + let (tx, mut rx) = broadcast::channel(2); + + let th1 = thread::spawn(move || { + block_on(async { + let mut rx2 = rx.clone(); + let v = assert_ok!(rx.recv().await); + assert_eq!(v, 1); + + // this would return closed if rem was incr'd in clone between + // read and write of rem for new tail entry. + let v2 = assert_ok!(rx2.recv().await); + assert_eq!(v2, 1); + }); + }); + assert_ok!(tx.send(1)); + + assert_ok!(th1.join()); + }); +} + From 74d872684e2f8049ae527c00d2bde4da64ff18fd Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Tue, 19 Apr 2022 09:41:57 -0700 Subject: [PATCH 05/13] fix a race with send2 --- tokio/src/sync/broadcast.rs | 31 +++++++---------- tokio/src/sync/tests/loom_broadcast.rs | 48 +++++++++++++++++++++++++- 2 files changed, 59 insertions(+), 20 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 3df46fbd9e7..ea179aada56 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -1031,11 +1031,19 @@ impl Clone for Receiver { fn clone(&self) -> Self { let next = self.next; let shared = self.shared.clone(); + let mut tail = shared.tail.lock(); - // register interest in the slot that next points to - // let this be lock-free since we're not yet operating on the tail. - let tail_pos = shared.tail.lock().pos; - for n in next..tail_pos { + // register the new receiver with `Tail` + if tail.rx_cnt == MAX_RECEIVERS { + panic!("max receivers"); + } + tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); + + // Register interest in the slots from next to tail.pos. + + // We need to hold the lock here to prevent a race with send2 where send2 overwrites + // next or moves past tail before we register interest in the slot. + for n in next..tail.pos { let idx = (n & shared.mask as u64) as usize; let slot = shared.buffer[idx].read().unwrap(); @@ -1045,21 +1053,6 @@ impl Clone for Receiver { // called concurrently. slot.rem.fetch_add(1, SeqCst); } - // tail pos may have changed, we need a locked section here to prevent a race with `Sender::send2` - let mut n = tail_pos.wrapping_sub(1); - let mut tail = shared.tail.lock(); - while n <= tail.pos { - let idx = (n & shared.mask as u64) as usize; - let slot = self.shared.buffer[idx].read().unwrap(); - slot.rem.fetch_add(1, SeqCst); - n = n.wrapping_add(1); - } - - // register the new receiver with `Tail` - if tail.rx_cnt == MAX_RECEIVERS { - panic!("max receivers"); - } - tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); drop(tail); diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs index 088b151c6ce..0aa9db65fc0 100644 --- a/tokio/src/sync/tests/loom_broadcast.rs +++ b/tokio/src/sync/tests/loom_broadcast.rs @@ -92,6 +92,52 @@ fn broadcast_two() { }); } +// An `Arc` is used as the value in order to detect memory leaks. +#[test] +fn broadcast_two_cloned() { + loom::model(|| { + let (tx, mut rx1) = broadcast::channel::>(16); + let mut rx2 = rx1.clone(); + + let th1 = thread::spawn(move || { + block_on(async { + let v = assert_ok!(rx1.recv().await); + assert_eq!(*v, "hello"); + + let v = assert_ok!(rx1.recv().await); + assert_eq!(*v, "world"); + + match assert_err!(rx1.recv().await) { + Closed => {} + _ => panic!(), + } + }); + }); + + let th2 = thread::spawn(move || { + block_on(async { + let v = assert_ok!(rx2.recv().await); + assert_eq!(*v, "hello"); + + let v = assert_ok!(rx2.recv().await); + assert_eq!(*v, "world"); + + match assert_err!(rx2.recv().await) { + Closed => {} + _ => panic!(), + } + }); + }); + + assert_ok!(tx.send(Arc::new("hello"))); + assert_ok!(tx.send(Arc::new("world"))); + drop(tx); + + assert_ok!(th1.join()); + assert_ok!(th2.join()); + }); +} + #[test] fn broadcast_wrap() { loom::model(|| { @@ -274,7 +320,7 @@ fn drop_multiple_cloned_rx_with_overflow() { #[test] fn send_and_rx_clone() { - // test the interraction of Sender::send and Rx::clone + // test the interaction of Sender::send and Rx::clone loom::model(move || { let (tx, mut rx) = broadcast::channel(2); From 72f604bc41bf8af869eb26235a25525ee209ffe7 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Tue, 19 Apr 2022 12:41:41 -0700 Subject: [PATCH 06/13] fmt --- tokio/src/sync/tests/loom_broadcast.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs index 0aa9db65fc0..e1ebba043eb 100644 --- a/tokio/src/sync/tests/loom_broadcast.rs +++ b/tokio/src/sync/tests/loom_broadcast.rs @@ -241,7 +241,6 @@ fn drop_cloned_rx() { let v = assert_ok!(rx1.recv().await); assert_eq!(v, "three"); - match assert_err!(rx1.recv().await) { Closed => {} _ => panic!(), @@ -263,7 +262,6 @@ fn drop_cloned_rx() { }); } - #[test] fn drop_multiple_rx_with_overflow() { loom::model(move || { @@ -341,4 +339,3 @@ fn send_and_rx_clone() { assert_ok!(th1.join()); }); } - From 3f95cc8be2ab40b8b9cbe211adf926defe8e86e9 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Sat, 30 Apr 2022 08:09:15 -0700 Subject: [PATCH 07/13] rm old diff --- tokio/src/sync/broadcast.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index ea179aada56..2eab192b9af 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -648,7 +648,7 @@ impl Sender { } /// Create a new `Receiver` which reads starting from the tail if `next_pos` is not specified. -fn new_receiver(shared: Arc>, next_pos: Option) -> Receiver { +fn new_receiver(shared: Arc>) -> Receiver { let mut tail = shared.tail.lock(); if tail.rx_cnt == MAX_RECEIVERS { @@ -657,9 +657,10 @@ fn new_receiver(shared: Arc>, next_pos: Option) -> Receiver tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); - let next = next_pos.unwrap_or(tail.pos); + let next = tail.pos; drop(tail); + Receiver { shared, next } } From ce17a1fb7abc72bbbff6239f8a4b68012ee659ba Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Sat, 30 Apr 2022 08:11:01 -0700 Subject: [PATCH 08/13] fix docs and calls --- tokio/src/sync/broadcast.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 2eab192b9af..f39e58af71c 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -557,7 +557,7 @@ impl Sender { /// ``` pub fn subscribe(&self) -> Receiver { let shared = self.shared.clone(); - new_receiver(shared, None) + new_receiver(shared) } /// Returns the number of active receivers @@ -647,7 +647,7 @@ impl Sender { } } -/// Create a new `Receiver` which reads starting from the tail if `next_pos` is not specified. +/// Create a new `Receiver` which reads starting from the tail. fn new_receiver(shared: Arc>) -> Receiver { let mut tail = shared.tail.lock(); From 312fd3df809840dccb342136991dee9113214d91 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Tue, 3 May 2022 08:53:12 -0700 Subject: [PATCH 09/13] add resubscribe method, move clone impl to clone_at_position method --- tokio/src/sync/broadcast.rs | 71 ++++++++++++++------------ tokio/src/sync/tests/loom_broadcast.rs | 12 ++--- tokio/tests/sync_broadcast.rs | 43 +++++++++++++++- 3 files changed, 85 insertions(+), 41 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index f39e58af71c..4909448cdeb 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -882,6 +882,44 @@ impl Receiver { } impl Receiver { + /// Re-subscribes to the channel starting from the current tail element (the last element passed to `Sender::send`.) + pub fn resubscribe(&self) -> Self { + let shared = self.shared.clone(); + new_receiver(shared) + } + + /// Clones the receiver maintaining the current position of the queue. This operation is `O(n)`, + /// if you do not need to maintain the current position please use `Receiver::resubscribe`. + pub fn clone_at_position(&self) -> Self { + let next = self.next; + let shared = self.shared.clone(); + let mut tail = shared.tail.lock(); + + // register the new receiver with `Tail` + if tail.rx_cnt == MAX_RECEIVERS { + panic!("max receivers"); + } + tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); + + // Register interest in the slots from next to tail.pos. + + // We need to hold the lock here to prevent a race with send2 where send2 overwrites + // next or moves past tail before we register interest in the slot. + for n in next..tail.pos { + let idx = (n & shared.mask as u64) as usize; + let slot = shared.buffer[idx].read().unwrap(); + + // a race with RecvGuard::drop would be bad, but is impossible since `self.next` + // is already incremented to the slot after the one that the `RecvGuard` points to. Additionally + // all methods that drop a `RecvGuard` require a &mut `Receiver` which ensures this method is not + // called concurrently. + slot.rem.fetch_add(1, SeqCst); + } + + drop(tail); + + Receiver { shared, next } + } /// Receives the next value for this receiver. /// /// Each [`Receiver`] handle will receive a clone of all values sent @@ -1028,39 +1066,6 @@ impl Drop for Receiver { } } -impl Clone for Receiver { - fn clone(&self) -> Self { - let next = self.next; - let shared = self.shared.clone(); - let mut tail = shared.tail.lock(); - - // register the new receiver with `Tail` - if tail.rx_cnt == MAX_RECEIVERS { - panic!("max receivers"); - } - tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); - - // Register interest in the slots from next to tail.pos. - - // We need to hold the lock here to prevent a race with send2 where send2 overwrites - // next or moves past tail before we register interest in the slot. - for n in next..tail.pos { - let idx = (n & shared.mask as u64) as usize; - let slot = shared.buffer[idx].read().unwrap(); - - // a race with RecvGuard::drop would be bad, but is impossible since `self.next` - // is already incremented to the slot after the one that the `RecvGuard` points to. Additionally - // all methods that drop a `RecvGuard` require a &mut `Receiver` which ensures this method is not - // called concurrently. - slot.rem.fetch_add(1, SeqCst); - } - - drop(tail); - - Receiver { shared, next } - } -} - impl<'a, T> Recv<'a, T> { fn new(receiver: &'a mut Receiver) -> Recv<'a, T> { Recv { diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs index e1ebba043eb..6f5c50648e6 100644 --- a/tokio/src/sync/tests/loom_broadcast.rs +++ b/tokio/src/sync/tests/loom_broadcast.rs @@ -97,7 +97,7 @@ fn broadcast_two() { fn broadcast_two_cloned() { loom::model(|| { let (tx, mut rx1) = broadcast::channel::>(16); - let mut rx2 = rx1.clone(); + let mut rx2 = rx1.clone_at_position(); let th1 = thread::spawn(move || { block_on(async { @@ -228,7 +228,7 @@ fn drop_rx() { fn drop_cloned_rx() { loom::model(|| { let (tx, mut rx1) = broadcast::channel(16); - let rx2 = rx1.clone(); + let rx2 = rx1.clone_at_position(); let th1 = thread::spawn(move || { block_on(async { @@ -294,7 +294,7 @@ fn drop_multiple_cloned_rx_with_overflow() { loom::model(move || { // It is essential to have multiple senders and receivers in this test case. let (tx, mut rx) = broadcast::channel(1); - let _rx2 = rx.clone(); + let _rx2 = rx.clone_at_position(); let _ = tx.send(()); let tx2 = tx.clone(); @@ -318,17 +318,17 @@ fn drop_multiple_cloned_rx_with_overflow() { #[test] fn send_and_rx_clone() { - // test the interaction of Sender::send and Rx::clone + // test the interaction of Sender::send and Rx::clone_at_position loom::model(move || { let (tx, mut rx) = broadcast::channel(2); let th1 = thread::spawn(move || { block_on(async { - let mut rx2 = rx.clone(); + let mut rx2 = rx.clone_at_position(); let v = assert_ok!(rx.recv().await); assert_eq!(v, 1); - // this would return closed if rem was incr'd in clone between + // this would return closed if rem was incr'd in clone_at_position between // read and write of rem for new tail entry. let v2 = assert_ok!(rx2.recv().await); assert_eq!(v2, 1); diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index b4728e421cc..6f137356e67 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -484,7 +484,7 @@ fn is_closed(err: broadcast::error::RecvError) -> bool { fn receiver_same_position_as_cloned() { let (tx, mut rx) = broadcast::channel(3); - let mut rx_clone = rx.clone(); + let mut rx_clone = rx.clone_at_position(); // verify rx count is incremented assert_eq!(tx.receiver_count(), 2); @@ -507,7 +507,7 @@ fn receiver_same_position_as_cloned() { // rx: [2, 3, _] assert_ok!(tx.send(4)); // rx: [2, 3, 4] - let mut rx_clone = rx.clone(); + let mut rx_clone = rx.clone_at_position(); // verify interest registered in slot, if not 3 and 4 is dropped and will rx_clone will not recv them. assert_eq!(assert_recv!(rx), 2); @@ -527,3 +527,42 @@ fn receiver_same_position_as_cloned() { assert_closed!(rx.try_recv()); assert_closed!(rx_clone.try_recv()); } + +#[test] +fn resubscribe_points_to_tail() { + let (tx, mut rx) = broadcast::channel(3); + tx.send(1); + + let mut rx_resub = rx.resubscribe(); + + // verify we're one behind at the start + assert_empty!(rx_resub), + assert_eq!(assert_recv!(rx), 1) + + // verify we do not affect rx + tx.send(2); + assert_eq!(assert_recv!(rx_resub), 2); + tx.send(3); + assert_eq!(assert_recv!(rx), 2); + assert_eq!(assert_recv!(rx), 3); + assert_empty!(rx); + + assert_eq!(assert_recv!(rx_resub), 3); + assert_empty!(rx_resub); +} + +#[test] +fn resubscribe_lagged() { + let (tx, mut rx) = broadcast::channel(1); + tx.send(1); + tx.send(2); + + let rx_resub = rx.resubscribe(); + assert_lagged!(rx); + assert_lagged!(rx_resub); + assert_eq!(assert_recv!(rx), 2); + assert_empty!(rx); + assert_eq!(assert_recv!(rx_resub), 2); + assert_empty!(rx_resub); + +} From 71c8a465a2c8b8491f6b781d530868360accebc9 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Tue, 3 May 2022 09:44:09 -0700 Subject: [PATCH 10/13] fix compile error --- tokio/tests/sync_broadcast.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 6f137356e67..f4b220011d8 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -534,10 +534,10 @@ fn resubscribe_points_to_tail() { tx.send(1); let mut rx_resub = rx.resubscribe(); - + // verify we're one behind at the start - assert_empty!(rx_resub), - assert_eq!(assert_recv!(rx), 1) + assert_empty!(rx_resub); + assert_eq!(assert_recv!(rx), 1); // verify we do not affect rx tx.send(2); @@ -564,5 +564,4 @@ fn resubscribe_lagged() { assert_empty!(rx); assert_eq!(assert_recv!(rx_resub), 2); assert_empty!(rx_resub); - } From cb29a035acf68291ea84498e76190dd84331a175 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Mon, 23 May 2022 11:24:13 -0700 Subject: [PATCH 11/13] rm clone, better docs, remove unnecessary tests --- tokio/src/sync/broadcast.rs | 56 ++++------- tokio/src/sync/tests/loom_broadcast.rs | 134 ------------------------- tokio/tests/sync_broadcast.rs | 66 ++---------- 3 files changed, 31 insertions(+), 225 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 4909448cdeb..993e048f13d 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -882,44 +882,32 @@ impl Receiver { } impl Receiver { - /// Re-subscribes to the channel starting from the current tail element (the last element passed to `Sender::send`.) + /// Re-subscribes to the channel starting from the current tail element. + /// + /// This [`Receiver`] handle will receive a clone of all values sent + /// **after** it has resubscribed. This will not include elements that are + /// in the queue of the current receiver. Consider the following example. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::broadcast; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = broadcast::channel(2); + /// + /// tx.send(1).unwrap(); + /// let mut rx2 = rx.resubscribe(); + /// tx.send(2).unwrap(); + /// + /// assert_neq!(rx.recv().await.unwrap(), rx2.recv().await.unwrap()); // 1 != 2 + /// } + /// ``` pub fn resubscribe(&self) -> Self { let shared = self.shared.clone(); new_receiver(shared) } - - /// Clones the receiver maintaining the current position of the queue. This operation is `O(n)`, - /// if you do not need to maintain the current position please use `Receiver::resubscribe`. - pub fn clone_at_position(&self) -> Self { - let next = self.next; - let shared = self.shared.clone(); - let mut tail = shared.tail.lock(); - - // register the new receiver with `Tail` - if tail.rx_cnt == MAX_RECEIVERS { - panic!("max receivers"); - } - tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); - - // Register interest in the slots from next to tail.pos. - - // We need to hold the lock here to prevent a race with send2 where send2 overwrites - // next or moves past tail before we register interest in the slot. - for n in next..tail.pos { - let idx = (n & shared.mask as u64) as usize; - let slot = shared.buffer[idx].read().unwrap(); - - // a race with RecvGuard::drop would be bad, but is impossible since `self.next` - // is already incremented to the slot after the one that the `RecvGuard` points to. Additionally - // all methods that drop a `RecvGuard` require a &mut `Receiver` which ensures this method is not - // called concurrently. - slot.rem.fetch_add(1, SeqCst); - } - - drop(tail); - - Receiver { shared, next } - } /// Receives the next value for this receiver. /// /// Each [`Receiver`] handle will receive a clone of all values sent diff --git a/tokio/src/sync/tests/loom_broadcast.rs b/tokio/src/sync/tests/loom_broadcast.rs index 6f5c50648e6..039b01bf436 100644 --- a/tokio/src/sync/tests/loom_broadcast.rs +++ b/tokio/src/sync/tests/loom_broadcast.rs @@ -92,52 +92,6 @@ fn broadcast_two() { }); } -// An `Arc` is used as the value in order to detect memory leaks. -#[test] -fn broadcast_two_cloned() { - loom::model(|| { - let (tx, mut rx1) = broadcast::channel::>(16); - let mut rx2 = rx1.clone_at_position(); - - let th1 = thread::spawn(move || { - block_on(async { - let v = assert_ok!(rx1.recv().await); - assert_eq!(*v, "hello"); - - let v = assert_ok!(rx1.recv().await); - assert_eq!(*v, "world"); - - match assert_err!(rx1.recv().await) { - Closed => {} - _ => panic!(), - } - }); - }); - - let th2 = thread::spawn(move || { - block_on(async { - let v = assert_ok!(rx2.recv().await); - assert_eq!(*v, "hello"); - - let v = assert_ok!(rx2.recv().await); - assert_eq!(*v, "world"); - - match assert_err!(rx2.recv().await) { - Closed => {} - _ => panic!(), - } - }); - }); - - assert_ok!(tx.send(Arc::new("hello"))); - assert_ok!(tx.send(Arc::new("world"))); - drop(tx); - - assert_ok!(th1.join()); - assert_ok!(th2.join()); - }); -} - #[test] fn broadcast_wrap() { loom::model(|| { @@ -224,43 +178,6 @@ fn drop_rx() { assert_ok!(th2.join()); }); } -#[test] -fn drop_cloned_rx() { - loom::model(|| { - let (tx, mut rx1) = broadcast::channel(16); - let rx2 = rx1.clone_at_position(); - - let th1 = thread::spawn(move || { - block_on(async { - let v = assert_ok!(rx1.recv().await); - assert_eq!(v, "one"); - - let v = assert_ok!(rx1.recv().await); - assert_eq!(v, "two"); - - let v = assert_ok!(rx1.recv().await); - assert_eq!(v, "three"); - - match assert_err!(rx1.recv().await) { - Closed => {} - _ => panic!(), - } - }); - }); - - let th2 = thread::spawn(move || { - drop(rx2); - }); - - assert_ok!(tx.send("one")); - assert_ok!(tx.send("two")); - assert_ok!(tx.send("three")); - drop(tx); - - assert_ok!(th1.join()); - assert_ok!(th2.join()); - }); -} #[test] fn drop_multiple_rx_with_overflow() { @@ -288,54 +205,3 @@ fn drop_multiple_rx_with_overflow() { assert_ok!(th2.join()); }); } - -#[test] -fn drop_multiple_cloned_rx_with_overflow() { - loom::model(move || { - // It is essential to have multiple senders and receivers in this test case. - let (tx, mut rx) = broadcast::channel(1); - let _rx2 = rx.clone_at_position(); - - let _ = tx.send(()); - let tx2 = tx.clone(); - let th1 = thread::spawn(move || { - block_on(async { - for _ in 0..100 { - let _ = tx2.send(()); - } - }); - }); - let _ = tx.send(()); - - let th2 = thread::spawn(move || { - block_on(async { while let Ok(_) = rx.recv().await {} }); - }); - - assert_ok!(th1.join()); - assert_ok!(th2.join()); - }); -} - -#[test] -fn send_and_rx_clone() { - // test the interaction of Sender::send and Rx::clone_at_position - loom::model(move || { - let (tx, mut rx) = broadcast::channel(2); - - let th1 = thread::spawn(move || { - block_on(async { - let mut rx2 = rx.clone_at_position(); - let v = assert_ok!(rx.recv().await); - assert_eq!(v, 1); - - // this would return closed if rem was incr'd in clone_at_position between - // read and write of rem for new tail entry. - let v2 = assert_ok!(rx2.recv().await); - assert_eq!(v2, 1); - }); - }); - assert_ok!(tx.send(1)); - - assert_ok!(th1.join()); - }); -} diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index f4b220011d8..b38b6380023 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -480,58 +480,10 @@ fn is_closed(err: broadcast::error::RecvError) -> bool { matches!(err, broadcast::error::RecvError::Closed) } -#[test] -fn receiver_same_position_as_cloned() { - let (tx, mut rx) = broadcast::channel(3); - - let mut rx_clone = rx.clone_at_position(); - // verify rx count is incremented - assert_eq!(tx.receiver_count(), 2); - - // verify ok to start recv on rx_clone before rx - assert_ok!(tx.send(1)); - assert_eq!(assert_recv!(rx_clone), 1); - - drop(rx_clone); - assert_ok!(tx.send(2)); - assert_ok!(tx.send(3)); - // rx: [1,2,3] - - // verify ok to start recv on rx before rx_clone - let mut rx_clone = rx.clone(); - assert_eq!(assert_recv!(rx), 1); - assert_eq!(assert_recv!(rx_clone), 1); - // as we drop the rx_clone we should have registered interest in all positions between Receiver::next and tail.pos so each are decremented when we recv them. - drop(rx_clone); - - // rx: [2, 3, _] - assert_ok!(tx.send(4)); - // rx: [2, 3, 4] - let mut rx_clone = rx.clone_at_position(); - - // verify interest registered in slot, if not 3 and 4 is dropped and will rx_clone will not recv them. - assert_eq!(assert_recv!(rx), 2); - assert_eq!(assert_recv!(rx), 3); - - // rx: [4, _, _] - // rx_clone: [2, 3, 4] - assert_eq!(assert_recv!(rx_clone), 2); - assert_eq!(assert_recv!(rx_clone), 3); - assert_eq!(assert_recv!(rx_clone), 4); - assert_eq!(assert_recv!(rx), 4); - assert_ok!(tx.send(5)); - drop(tx); - assert_eq!(assert_recv!(rx), 5); - assert_eq!(assert_recv!(rx_clone), 5); - - assert_closed!(rx.try_recv()); - assert_closed!(rx_clone.try_recv()); -} - #[test] fn resubscribe_points_to_tail() { let (tx, mut rx) = broadcast::channel(3); - tx.send(1); + tx.send(1).unwrap(); let mut rx_resub = rx.resubscribe(); @@ -540,9 +492,9 @@ fn resubscribe_points_to_tail() { assert_eq!(assert_recv!(rx), 1); // verify we do not affect rx - tx.send(2); + tx.send(2).unwrap(); assert_eq!(assert_recv!(rx_resub), 2); - tx.send(3); + tx.send(3).unwrap(); assert_eq!(assert_recv!(rx), 2); assert_eq!(assert_recv!(rx), 3); assert_empty!(rx); @@ -554,14 +506,14 @@ fn resubscribe_points_to_tail() { #[test] fn resubscribe_lagged() { let (tx, mut rx) = broadcast::channel(1); - tx.send(1); - tx.send(2); + tx.send(1).unwrap(); + tx.send(2).unwrap(); + + let mut rx_resub = rx.resubscribe(); + assert_lagged!(rx.try_recv(), 1); + assert_empty!(rx_resub); - let rx_resub = rx.resubscribe(); - assert_lagged!(rx); - assert_lagged!(rx_resub); assert_eq!(assert_recv!(rx), 2); assert_empty!(rx); - assert_eq!(assert_recv!(rx_resub), 2); assert_empty!(rx_resub); } From e06e9073db770a117feb275f7a87ebaefa283ae2 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Mon, 23 May 2022 12:08:29 -0700 Subject: [PATCH 12/13] fix doctest --- tokio/src/sync/broadcast.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 993e048f13d..33a15805ad6 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -901,7 +901,7 @@ impl Receiver { /// let mut rx2 = rx.resubscribe(); /// tx.send(2).unwrap(); /// - /// assert_neq!(rx.recv().await.unwrap(), rx2.recv().await.unwrap()); // 1 != 2 + /// assert_eq!(rx.recv().await.unwrap(), rx2.recv().await.unwrap()); // 1 != 2 /// } /// ``` pub fn resubscribe(&self) -> Self { From 695babed6293a63e34dafc5ff15f8ae292e308d5 Mon Sep 17 00:00:00 2001 From: Evan Simmons Date: Mon, 23 May 2022 12:09:42 -0700 Subject: [PATCH 13/13] use suggestion --- tokio/src/sync/broadcast.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 33a15805ad6..c796d129920 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -901,7 +901,8 @@ impl Receiver { /// let mut rx2 = rx.resubscribe(); /// tx.send(2).unwrap(); /// - /// assert_eq!(rx.recv().await.unwrap(), rx2.recv().await.unwrap()); // 1 != 2 + /// assert_eq!(rx2.recv().await.unwrap(), 2); + /// assert_eq!(rx.recv().await.unwrap(), 1); /// } /// ``` pub fn resubscribe(&self) -> Self {