Skip to content

Commit

Permalink
sync: Remove readiness assertion in `watch::Receiver::changed() (#2839)
Browse files Browse the repository at this point in the history
*In `watch::Receiver::changed` `Notified` was polled
for the first time to ensure the waiter is registered while
assuming that the first poll will always return `Pending`.
It is the case however that another instance of `Notified`
is dropped without receiving its notification, this "orphaned"
notification can be used to satisfy another waiter without
even registering it. This commit accounts for that scenario.
  • Loading branch information
zaharidichev committed Sep 22, 2020
1 parent 2348f67 commit e7091fd
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 16 deletions.
33 changes: 20 additions & 13 deletions tokio/src/sync/notify.rs
Expand Up @@ -106,6 +106,14 @@ pub struct Notify {
waiters: Mutex<WaitList>,
}

#[derive(Debug, Clone, Copy)]
enum NotificationType {
// Notification triggered by calling `notify_waiters`
AllWaiters,
// Notification triggered by calling `notify_one`
OneWaiter,
}

#[derive(Debug)]
struct Waiter {
/// Intrusive linked-list pointers
Expand All @@ -115,7 +123,7 @@ struct Waiter {
waker: Option<Waker>,

/// `true` if the notification has been assigned to this waiter.
notified: bool,
notified: Option<NotificationType>,

/// Should not be `Unpin`.
_p: PhantomPinned,
Expand Down Expand Up @@ -230,7 +238,7 @@ impl Notify {
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
notified: false,
notified: None,
_p: PhantomPinned,
}),
}
Expand Down Expand Up @@ -327,9 +335,9 @@ impl Notify {
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(!waiter.notified);
assert!(waiter.notified.is_none());

waiter.notified = true;
waiter.notified = Some(NotificationType::AllWaiters);

if let Some(waker) = waiter.waker.take() {
waker.wake();
Expand Down Expand Up @@ -375,9 +383,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option<W
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(!waiter.notified);
assert!(waiter.notified.is_none());

waiter.notified = true;
waiter.notified = Some(NotificationType::OneWaiter);
let waker = waiter.waker.take();

if waiters.is_empty() {
Expand Down Expand Up @@ -506,11 +514,11 @@ impl Future for Notified<'_> {
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };

if w.notified {
if w.notified.is_some() {
// Our waker has been notified. Reset the fields and
// remove it from the list.
w.waker = None;
w.notified = false;
w.notified = None;

*state = Done;
} else {
Expand Down Expand Up @@ -582,14 +590,13 @@ impl Drop for Notified<'_> {
notify.state.store(EMPTY, SeqCst);
}

// See if the node was notified but not received. In this case, the
// notification must be sent to another waiter.
// See if the node was notified but not received. In this case, if
// the notification was triggered via `notify_one`, it must be sent
// to the next waiter.
//
// Safety: with the entry removed from the linked list, there can be
// no concurrent access to the entry
let notified = unsafe { (*waiter.get()).notified };

if notified {
if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
drop(waiters);
waker.wake();
Expand Down
22 changes: 19 additions & 3 deletions tokio/src/sync/tests/loom_watch.rs
Expand Up @@ -6,14 +6,30 @@ use loom::thread;
#[test]
fn smoke() {
loom::model(|| {
let (tx, mut rx) = watch::channel(1);
let (tx, mut rx1) = watch::channel(1);
let mut rx2 = rx1.clone();
let mut rx3 = rx1.clone();
let mut rx4 = rx1.clone();
let mut rx5 = rx1.clone();

let th = thread::spawn(move || {
tx.send(2).unwrap();
});

block_on(rx.changed()).unwrap();
assert_eq!(*rx.borrow(), 2);
block_on(rx1.changed()).unwrap();
assert_eq!(*rx1.borrow(), 2);

block_on(rx2.changed()).unwrap();
assert_eq!(*rx2.borrow(), 2);

block_on(rx3.changed()).unwrap();
assert_eq!(*rx3.borrow(), 2);

block_on(rx4.changed()).unwrap();
assert_eq!(*rx4.borrow(), 2);

block_on(rx5.changed()).unwrap();
assert_eq!(*rx5.borrow(), 2);

th.join().unwrap();
})
Expand Down

0 comments on commit e7091fd

Please sign in to comment.