From 22a73aba1b4fff9ae0e3ac052b231e6aa7d77aef Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 17 Sep 2020 12:01:27 +0300 Subject: [PATCH 1/3] sync: Remove readiness assertion in `watch::Receiver::changed() In `watch::Receiver::changed` `Notified` was polled for the first time to ensure the waiter is registered while assuming that the first poll will always return `Pending`. It is the case however that another instance of `Notified` is dropped without receiving its notification, this "orphaned" notification can be used to satisfy another waiter without even registering it. This commit accounts for that scenario. Signed-off-by: Zahari Dichev --- tokio/src/sync/tests/loom_watch.rs | 22 +++++++++++++++++++--- tokio/src/sync/watch.rs | 24 ++++++++++++++++-------- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/tokio/src/sync/tests/loom_watch.rs b/tokio/src/sync/tests/loom_watch.rs index 7944cab8d24..c575b5b66c5 100644 --- a/tokio/src/sync/tests/loom_watch.rs +++ b/tokio/src/sync/tests/loom_watch.rs @@ -6,14 +6,30 @@ use loom::thread; #[test] fn smoke() { loom::model(|| { - let (tx, mut rx) = watch::channel(1); + let (tx, mut rx1) = watch::channel(1); + let mut rx2 = rx1.clone(); + let mut rx3 = rx1.clone(); + let mut rx4 = rx1.clone(); + let mut rx5 = rx1.clone(); let th = thread::spawn(move || { tx.send(2).unwrap(); }); - block_on(rx.changed()).unwrap(); - assert_eq!(*rx.borrow(), 2); + block_on(rx1.changed()).unwrap(); + assert_eq!(*rx1.borrow(), 2); + + block_on(rx2.changed()).unwrap(); + assert_eq!(*rx2.borrow(), 2); + + block_on(rx3.changed()).unwrap(); + assert_eq!(*rx3.borrow(), 2); + + block_on(rx4.changed()).unwrap(); + assert_eq!(*rx4.borrow(), 2); + + block_on(rx5.changed()).unwrap(); + assert_eq!(*rx5.borrow(), 2); th.join().unwrap(); }) diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 7d1ac9e8fdc..995b4e842bc 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -251,14 +251,22 @@ impl Receiver { let notified = self.shared.notify_rx.notified(); pin!(notified); - // Polling the future once is guaranteed to return `Pending` as `watch` - // only notifies using `notify_waiters`. - crate::future::poll_fn(|cx| { - let res = Pin::new(&mut notified).poll(cx); - assert!(!res.is_ready()); - Poll::Ready(()) - }) - .await; + // Polling the future here has dual purpose. The first one is to register + // the waiter so when `notify_waiters` is called it is notified. The second + // is to cover the case where another instance of `Notiified` has been dropped + // without receiving its notification. If that was the case polling the + // future for the first time will use this "lost" notification and return + // `Ready` immediatelly without registering any waiter + let aquired_lost_notification = + crate::future::poll_fn(|cx| match Pin::new(&mut notified).poll(cx) { + Poll::Ready(()) => Poll::Ready(true), + Poll::Pending => Poll::Ready(false), + }) + .await; + + if aquired_lost_notification { + return Ok(()); + } if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { return ret; From 2d0e3f6e78181754c00035f0a28c5e859ac9fdc5 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 18 Sep 2020 10:33:10 +0300 Subject: [PATCH 2/3] Do not transfer notifications when trigerred by notify_waiters Signed-off-by: Zahari Dichev --- tokio/src/sync/notify.rs | 30 ++++++++++++++++++------------ tokio/src/sync/watch.rs | 24 ++++++++---------------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index c69e2b07dbf..331247b0385 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -105,6 +105,13 @@ pub struct Notify { state: AtomicU8, waiters: Mutex, } +#[derive(Debug, Clone, Copy)] +enum NotificationType { + // Notification trigerred by calling `notify_waiters` + AllWaiters, + // Notification trigerred by calling `notify_one` + OneWaiter, +} #[derive(Debug)] struct Waiter { @@ -115,7 +122,7 @@ struct Waiter { waker: Option, /// `true` if the notification has been assigned to this waiter. - notified: bool, + notified: Option, /// Should not be `Unpin`. _p: PhantomPinned, @@ -230,7 +237,7 @@ impl Notify { waiter: UnsafeCell::new(Waiter { pointers: linked_list::Pointers::new(), waker: None, - notified: false, + notified: None, _p: PhantomPinned, }), } @@ -327,9 +334,9 @@ impl Notify { // Safety: `waiters` lock is still held. let waiter = unsafe { waiter.as_mut() }; - assert!(!waiter.notified); + assert!(waiter.notified.is_none()); - waiter.notified = true; + waiter.notified = Some(NotificationType::AllWaiters); if let Some(waker) = waiter.waker.take() { waker.wake(); @@ -375,9 +382,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option { // Safety: called while locked let w = unsafe { &mut *waiter.get() }; - if w.notified { + if w.notified.is_some() { // Our waker has been notified. Reset the fields and // remove it from the list. w.waker = None; - w.notified = false; + w.notified = None; *state = Done; } else { @@ -583,13 +590,12 @@ impl Drop for Notified<'_> { } // See if the node was notified but not received. In this case, the - // notification must be sent to another waiter. + // notification must be sent to another waiter, only if it was + // triggered via `notify_one` // // Safety: with the entry removed from the linked list, there can be // no concurrent access to the entry - let notified = unsafe { (*waiter.get()).notified }; - - if notified { + if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } { if let Some(waker) = notify_locked(&mut waiters, ¬ify.state, notify_state) { drop(waiters); waker.wake(); diff --git a/tokio/src/sync/watch.rs b/tokio/src/sync/watch.rs index 995b4e842bc..7d1ac9e8fdc 100644 --- a/tokio/src/sync/watch.rs +++ b/tokio/src/sync/watch.rs @@ -251,22 +251,14 @@ impl Receiver { let notified = self.shared.notify_rx.notified(); pin!(notified); - // Polling the future here has dual purpose. The first one is to register - // the waiter so when `notify_waiters` is called it is notified. The second - // is to cover the case where another instance of `Notiified` has been dropped - // without receiving its notification. If that was the case polling the - // future for the first time will use this "lost" notification and return - // `Ready` immediatelly without registering any waiter - let aquired_lost_notification = - crate::future::poll_fn(|cx| match Pin::new(&mut notified).poll(cx) { - Poll::Ready(()) => Poll::Ready(true), - Poll::Pending => Poll::Ready(false), - }) - .await; - - if aquired_lost_notification { - return Ok(()); - } + // Polling the future once is guaranteed to return `Pending` as `watch` + // only notifies using `notify_waiters`. + crate::future::poll_fn(|cx| { + let res = Pin::new(&mut notified).poll(cx); + assert!(!res.is_ready()); + Poll::Ready(()) + }) + .await; if let Some(ret) = maybe_changed(&self.shared, &mut self.version) { return ret; From 3cbc546464ad31e4d6a29f0e8367913fbf327288 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 22 Sep 2020 10:52:16 +0300 Subject: [PATCH 3/3] Address nits Signed-off-by: Zahari Dichev --- tokio/src/sync/notify.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 331247b0385..56bbc51bf28 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -105,11 +105,12 @@ pub struct Notify { state: AtomicU8, waiters: Mutex, } + #[derive(Debug, Clone, Copy)] enum NotificationType { - // Notification trigerred by calling `notify_waiters` + // Notification triggered by calling `notify_waiters` AllWaiters, - // Notification trigerred by calling `notify_one` + // Notification triggered by calling `notify_one` OneWaiter, } @@ -589,9 +590,9 @@ impl Drop for Notified<'_> { notify.state.store(EMPTY, SeqCst); } - // See if the node was notified but not received. In this case, the - // notification must be sent to another waiter, only if it was - // triggered via `notify_one` + // See if the node was notified but not received. In this case, if + // the notification was triggered via `notify_one`, it must be sent + // to the next waiter. // // Safety: with the entry removed from the linked list, there can be // no concurrent access to the entry