Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: Remove readiness assertion in `watch::Receiver::changed() #2839

Merged
merged 3 commits into from Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 18 additions & 12 deletions tokio/src/sync/notify.rs
Expand Up @@ -105,6 +105,13 @@ pub struct Notify {
state: AtomicU8,
waiters: Mutex<WaitList>,
}
#[derive(Debug, Clone, Copy)]
enum NotificationType {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i'd add a newline here (surprised that rustfmt doesn't do that?)

Suggested change
}
#[derive(Debug, Clone, Copy)]
enum NotificationType {
}
#[derive(Debug, Clone, Copy)]
enum NotificationType {

// Notification trigerred by calling `notify_waiters`
AllWaiters,
// Notification trigerred by calling `notify_one`
OneWaiter,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typos:

Suggested change
// Notification trigerred by calling `notify_waiters`
AllWaiters,
// Notification trigerred by calling `notify_one`
OneWaiter,
// Notification triggered by calling `notify_waiters`
AllWaiters,
// Notification triggered by calling `notify_one`
OneWaiter,

}

#[derive(Debug)]
struct Waiter {
Expand All @@ -115,7 +122,7 @@ struct Waiter {
waker: Option<Waker>,

/// `true` if the notification has been assigned to this waiter.
notified: bool,
notified: Option<NotificationType>,

/// Should not be `Unpin`.
_p: PhantomPinned,
Expand Down Expand Up @@ -230,7 +237,7 @@ impl Notify {
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
notified: false,
notified: None,
_p: PhantomPinned,
}),
}
Expand Down Expand Up @@ -327,9 +334,9 @@ impl Notify {
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(!waiter.notified);
assert!(waiter.notified.is_none());

waiter.notified = true;
waiter.notified = Some(NotificationType::AllWaiters);

if let Some(waker) = waiter.waker.take() {
waker.wake();
Expand Down Expand Up @@ -375,9 +382,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option<W
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(!waiter.notified);
assert!(waiter.notified.is_none());

waiter.notified = true;
waiter.notified = Some(NotificationType::OneWaiter);
let waker = waiter.waker.take();

if waiters.is_empty() {
Expand Down Expand Up @@ -506,11 +513,11 @@ impl Future for Notified<'_> {
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };

if w.notified {
if w.notified.is_some() {
// Our waker has been notified. Reset the fields and
// remove it from the list.
w.waker = None;
w.notified = false;
w.notified = None;

*state = Done;
} else {
Expand Down Expand Up @@ -583,13 +590,12 @@ impl Drop for Notified<'_> {
}

// See if the node was notified but not received. In this case, the
// notification must be sent to another waiter.
// notification must be sent to another waiter, only if it was
// triggered via `notify_one`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might rephrase this to something like

            // See if the node was notified but not received. In this case, if
            // the notification was triggered via `notify_one`, it must be sent
            // to the next waiter.

//
// Safety: with the entry removed from the linked list, there can be
// no concurrent access to the entry
let notified = unsafe { (*waiter.get()).notified };

if notified {
if let Some(NotificationType::OneWaiter) = unsafe { (*waiter.get()).notified } {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
drop(waiters);
waker.wake();
Expand Down
22 changes: 19 additions & 3 deletions tokio/src/sync/tests/loom_watch.rs
Expand Up @@ -6,14 +6,30 @@ use loom::thread;
#[test]
fn smoke() {
loom::model(|| {
let (tx, mut rx) = watch::channel(1);
let (tx, mut rx1) = watch::channel(1);
let mut rx2 = rx1.clone();
let mut rx3 = rx1.clone();
let mut rx4 = rx1.clone();
let mut rx5 = rx1.clone();

let th = thread::spawn(move || {
tx.send(2).unwrap();
});

block_on(rx.changed()).unwrap();
assert_eq!(*rx.borrow(), 2);
block_on(rx1.changed()).unwrap();
assert_eq!(*rx1.borrow(), 2);

block_on(rx2.changed()).unwrap();
assert_eq!(*rx2.borrow(), 2);

block_on(rx3.changed()).unwrap();
assert_eq!(*rx3.borrow(), 2);

block_on(rx4.changed()).unwrap();
assert_eq!(*rx4.borrow(), 2);

block_on(rx5.changed()).unwrap();
assert_eq!(*rx5.borrow(), 2);

th.join().unwrap();
})
Expand Down