Skip to content

Commit

Permalink
rm clone, better docs, remove unnecessary tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Simmons committed May 23, 2022
1 parent c2eecf7 commit 8b274de
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 225 deletions.
56 changes: 22 additions & 34 deletions tokio/src/sync/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -882,44 +882,32 @@ impl<T> Receiver<T> {
}

impl<T: Clone> Receiver<T> {
/// Re-subscribes to the channel starting from the current tail element (the last element passed to `Sender::send`.)
/// Re-subscribes to the channel starting from the current tail element.
///
/// This [`Receiver`] handle will receive a clone of all values sent
/// **after** it has resubscribed. This will not include elements that are
/// in the queue of the current receiver. Consider the following example.
///
/// # Examples
///
/// ```
/// use tokio::sync::broadcast;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = broadcast::channel(2);
///
/// tx.send(1).unwrap();
/// let mut rx2 = rx.resubscribe();
/// tx.send(2).unwrap();
///
/// assert_neq!(rx.recv().await.unwrap(), rx2.recv().await.unwrap()); // 1 != 2
/// }
/// ```
pub fn resubscribe(&self) -> Self {
let shared = self.shared.clone();
new_receiver(shared)
}

/// Clones the receiver maintaining the current position of the queue. This operation is `O(n)`,
/// if you do not need to maintain the current position please use `Receiver::resubscribe`.
pub fn clone_at_position(&self) -> Self {
let next = self.next;
let shared = self.shared.clone();
let mut tail = shared.tail.lock();

// register the new receiver with `Tail`
if tail.rx_cnt == MAX_RECEIVERS {
panic!("max receivers");
}
tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow");

// Register interest in the slots from next to tail.pos.

// We need to hold the lock here to prevent a race with send2 where send2 overwrites
// next or moves past tail before we register interest in the slot.
for n in next..tail.pos {
let idx = (n & shared.mask as u64) as usize;
let slot = shared.buffer[idx].read().unwrap();

// a race with RecvGuard::drop would be bad, but is impossible since `self.next`
// is already incremented to the slot after the one that the `RecvGuard` points to. Additionally
// all methods that drop a `RecvGuard` require a &mut `Receiver` which ensures this method is not
// called concurrently.
slot.rem.fetch_add(1, SeqCst);
}

drop(tail);

Receiver { shared, next }
}
/// Receives the next value for this receiver.
///
/// Each [`Receiver`] handle will receive a clone of all values sent
Expand Down
134 changes: 0 additions & 134 deletions tokio/src/sync/tests/loom_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,52 +92,6 @@ fn broadcast_two() {
});
}

// An `Arc` is used as the value in order to detect memory leaks.
#[test]
fn broadcast_two_cloned() {
loom::model(|| {
let (tx, mut rx1) = broadcast::channel::<Arc<&'static str>>(16);
let mut rx2 = rx1.clone_at_position();

let th1 = thread::spawn(move || {
block_on(async {
let v = assert_ok!(rx1.recv().await);
assert_eq!(*v, "hello");

let v = assert_ok!(rx1.recv().await);
assert_eq!(*v, "world");

match assert_err!(rx1.recv().await) {
Closed => {}
_ => panic!(),
}
});
});

let th2 = thread::spawn(move || {
block_on(async {
let v = assert_ok!(rx2.recv().await);
assert_eq!(*v, "hello");

let v = assert_ok!(rx2.recv().await);
assert_eq!(*v, "world");

match assert_err!(rx2.recv().await) {
Closed => {}
_ => panic!(),
}
});
});

assert_ok!(tx.send(Arc::new("hello")));
assert_ok!(tx.send(Arc::new("world")));
drop(tx);

assert_ok!(th1.join());
assert_ok!(th2.join());
});
}

#[test]
fn broadcast_wrap() {
loom::model(|| {
Expand Down Expand Up @@ -224,43 +178,6 @@ fn drop_rx() {
assert_ok!(th2.join());
});
}
#[test]
fn drop_cloned_rx() {
loom::model(|| {
let (tx, mut rx1) = broadcast::channel(16);
let rx2 = rx1.clone_at_position();

let th1 = thread::spawn(move || {
block_on(async {
let v = assert_ok!(rx1.recv().await);
assert_eq!(v, "one");

let v = assert_ok!(rx1.recv().await);
assert_eq!(v, "two");

let v = assert_ok!(rx1.recv().await);
assert_eq!(v, "three");

match assert_err!(rx1.recv().await) {
Closed => {}
_ => panic!(),
}
});
});

let th2 = thread::spawn(move || {
drop(rx2);
});

assert_ok!(tx.send("one"));
assert_ok!(tx.send("two"));
assert_ok!(tx.send("three"));
drop(tx);

assert_ok!(th1.join());
assert_ok!(th2.join());
});
}

#[test]
fn drop_multiple_rx_with_overflow() {
Expand Down Expand Up @@ -288,54 +205,3 @@ fn drop_multiple_rx_with_overflow() {
assert_ok!(th2.join());
});
}

#[test]
fn drop_multiple_cloned_rx_with_overflow() {
loom::model(move || {
// It is essential to have multiple senders and receivers in this test case.
let (tx, mut rx) = broadcast::channel(1);
let _rx2 = rx.clone_at_position();

let _ = tx.send(());
let tx2 = tx.clone();
let th1 = thread::spawn(move || {
block_on(async {
for _ in 0..100 {
let _ = tx2.send(());
}
});
});
let _ = tx.send(());

let th2 = thread::spawn(move || {
block_on(async { while let Ok(_) = rx.recv().await {} });
});

assert_ok!(th1.join());
assert_ok!(th2.join());
});
}

#[test]
fn send_and_rx_clone() {
// test the interaction of Sender::send and Rx::clone_at_position
loom::model(move || {
let (tx, mut rx) = broadcast::channel(2);

let th1 = thread::spawn(move || {
block_on(async {
let mut rx2 = rx.clone_at_position();
let v = assert_ok!(rx.recv().await);
assert_eq!(v, 1);

// this would return closed if rem was incr'd in clone_at_position between
// read and write of rem for new tail entry.
let v2 = assert_ok!(rx2.recv().await);
assert_eq!(v2, 1);
});
});
assert_ok!(tx.send(1));

assert_ok!(th1.join());
});
}
66 changes: 9 additions & 57 deletions tokio/tests/sync_broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,58 +480,10 @@ fn is_closed(err: broadcast::error::RecvError) -> bool {
matches!(err, broadcast::error::RecvError::Closed)
}

#[test]
fn receiver_same_position_as_cloned() {
let (tx, mut rx) = broadcast::channel(3);

let mut rx_clone = rx.clone_at_position();
// verify rx count is incremented
assert_eq!(tx.receiver_count(), 2);

// verify ok to start recv on rx_clone before rx
assert_ok!(tx.send(1));
assert_eq!(assert_recv!(rx_clone), 1);

drop(rx_clone);
assert_ok!(tx.send(2));
assert_ok!(tx.send(3));
// rx: [1,2,3]

// verify ok to start recv on rx before rx_clone
let mut rx_clone = rx.clone();
assert_eq!(assert_recv!(rx), 1);
assert_eq!(assert_recv!(rx_clone), 1);
// as we drop the rx_clone we should have registered interest in all positions between Receiver::next and tail.pos so each are decremented when we recv them.
drop(rx_clone);

// rx: [2, 3, _]
assert_ok!(tx.send(4));
// rx: [2, 3, 4]
let mut rx_clone = rx.clone_at_position();

// verify interest registered in slot, if not 3 and 4 is dropped and will rx_clone will not recv them.
assert_eq!(assert_recv!(rx), 2);
assert_eq!(assert_recv!(rx), 3);

// rx: [4, _, _]
// rx_clone: [2, 3, 4]
assert_eq!(assert_recv!(rx_clone), 2);
assert_eq!(assert_recv!(rx_clone), 3);
assert_eq!(assert_recv!(rx_clone), 4);
assert_eq!(assert_recv!(rx), 4);
assert_ok!(tx.send(5));
drop(tx);
assert_eq!(assert_recv!(rx), 5);
assert_eq!(assert_recv!(rx_clone), 5);

assert_closed!(rx.try_recv());
assert_closed!(rx_clone.try_recv());
}

#[test]
fn resubscribe_points_to_tail() {
let (tx, mut rx) = broadcast::channel(3);
tx.send(1);
tx.send(1).unwrap();

let mut rx_resub = rx.resubscribe();

Expand All @@ -540,9 +492,9 @@ fn resubscribe_points_to_tail() {
assert_eq!(assert_recv!(rx), 1);

// verify we do not affect rx
tx.send(2);
tx.send(2).unwrap();
assert_eq!(assert_recv!(rx_resub), 2);
tx.send(3);
tx.send(3).unwrap();
assert_eq!(assert_recv!(rx), 2);
assert_eq!(assert_recv!(rx), 3);
assert_empty!(rx);
Expand All @@ -554,14 +506,14 @@ fn resubscribe_points_to_tail() {
#[test]
fn resubscribe_lagged() {
let (tx, mut rx) = broadcast::channel(1);
tx.send(1);
tx.send(2);
tx.send(1).unwrap();
tx.send(2).unwrap();

let mut rx_resub = rx.resubscribe();
assert_lagged!(rx.try_recv(), 1);
assert_empty!(rx_resub);

let rx_resub = rx.resubscribe();
assert_lagged!(rx);
assert_lagged!(rx_resub);
assert_eq!(assert_recv!(rx), 2);
assert_empty!(rx);
assert_eq!(assert_recv!(rx_resub), 2);
assert_empty!(rx_resub);
}

0 comments on commit 8b274de

Please sign in to comment.