From e7091fde786722a5301270e6281fc3c449dcfc14 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 22 Sep 2020 18:12:57 +0300 Subject: [PATCH] sync: Remove readiness assertion in `watch::Receiver::changed() (#2839) *In `watch::Receiver::changed` `Notified` was polled for the first time to ensure the waiter is registered while assuming that the first poll will always return `Pending`. It is the case however that another instance of `Notified` is dropped without receiving its notification, this "orphaned" notification can be used to satisfy another waiter without even registering it. This commit accounts for that scenario. --- tokio/src/sync/notify.rs | 33 ++++++++++++++++++------------ tokio/src/sync/tests/loom_watch.rs | 22 +++++++++++++++++--- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index c69e2b07dbf..56bbc51bf28 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -106,6 +106,14 @@ pub struct Notify { waiters: Mutex, } +#[derive(Debug, Clone, Copy)] +enum NotificationType { + // Notification triggered by calling `notify_waiters` + AllWaiters, + // Notification triggered by calling `notify_one` + OneWaiter, +} + #[derive(Debug)] struct Waiter { /// Intrusive linked-list pointers @@ -115,7 +123,7 @@ struct Waiter { waker: Option, /// `true` if the notification has been assigned to this waiter. - notified: bool, + notified: Option, /// Should not be `Unpin`. _p: PhantomPinned, @@ -230,7 +238,7 @@ impl Notify { waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - notified: false, + notified: None, _p: PhantomPinned, }), } @@ -327,9 +335,9 @@ impl Notify { // Safety: `waiters` lock is still held. let waiter = unsafe { waiter.as_mut() }; - assert!(!waiter.notified); + assert!(waiter.notified.is_none()); - waiter.notified = true; + waiter.notified = Some(NotificationType::AllWaiters); if let Some(waker) = waiter.waker.take() { waker.wake(); @@ -375,9 +383,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option { // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if w.notified { + if w.notified.is_some() { // Our waker has been notified. Reset the fields and // remove it from the list. w.waker = None; - w.notified = false; + w.notified = None; *state = Done; } else { @@ -582,14 +590,13 @@ impl Drop for Notified<'_> { notify.state.store(EMPTY, SeqCst); } - // See if the node was notified but not received. In this case, the - // notification must be sent to another waiter. + // See if the node was notified but not received. In this case, if + // the notification was triggered via `notify_one`, it must be sent + // to the next waiter. // // Safety: with the entry removed from the linked list, there can be // no concurrent access to the entry - let notified = unsafe { (*waiter.get()).notified }; - - if notified { + if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } { if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { drop(waiters); waker.wake(); diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index 7944cab8d24..c575b5b66c5 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -6,14 +6,30 @@ use loom::thread; #[test] fn smoke() { loom::model(|| { - let (tx, mut rx) = watch::channel(1); + let (tx, mut rx1) = watch::channel(1); + let mut rx2 = rx1.clone(); + let mut rx3 = rx1.clone(); + let mut rx4 = rx1.clone(); + let mut rx5 = rx1.clone(); let th = thread::spawn(move || { tx.send(2).unwrap(); }); - block_on(rx.changed()).unwrap(); - assert_eq!(*rx.borrow(), 2); + block_on(rx1.changed()).unwrap(); + assert_eq!(*rx1.borrow(), 2); + + block_on(rx2.changed()).unwrap(); + assert_eq!(*rx2.borrow(), 2); + + block_on(rx3.changed()).unwrap(); + assert_eq!(*rx3.borrow(), 2); + + block_on(rx4.changed()).unwrap(); + assert_eq!(*rx4.borrow(), 2); + + block_on(rx5.changed()).unwrap(); + assert_eq!(*rx5.borrow(), 2); th.join().unwrap(); })