Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: fix notify_waiters notifying sequential awaits #5404

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
218 changes: 154 additions & 64 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,16 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// [`Semaphore`]: crate::sync::Semaphore
#[derive(Debug)]
pub struct Notify {
// This uses 2 bits to store one of `EMPTY`,
// `state` uses 2 bits to store one of `EMPTY`,
// `WAITING` or `NOTIFIED`. The rest of the bits
// are used to store the number of times `notify_waiters`
// was called.
//
// Throughout the code there are two assumptions:
// - state can be transitioned *from* `WAITING` only if
// `waiters` lock is held
// - number of times `notify_waiters` was called can
// be modified only if `waiters` lock is held
state: AtomicUsize,
waiters: Mutex<WaitList>,
}
Expand All @@ -222,8 +228,13 @@ struct Waiter {
/// Waiting task's waker.
waker: Option<Waker>,

/// `true` if the notification has been assigned to this waiter.
notified: Option<NotificationType>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this field because it was confusing to me because of the similarly named future.

/// Pointer to a containing queue decoupled in `notify_waiters`.
/// This field is `None` if the waiter is stored in the waitlist
/// `Notify::waiters` list, and `Some` if the waiter is stored
/// in some other list owned by a `notify_waiters` call.
notify_waiters_queue: Option<NonNull<WaitList>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell, the invariant of this field is that it is None when it is in the Notify's own list, and Some when it is in some list. Please document that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, I've added a note about this.


notification: Option<NotificationType>,

/// Should not be `Unpin`.
_p: PhantomPinned,
Expand All @@ -237,6 +248,9 @@ generate_addr_of_methods! {
}
}

unsafe impl Send for Waiter {}
unsafe impl Sync for Waiter {}

/// Future returned from [`Notify::notified()`].
///
/// This future is fused, so once it has completed, any future calls to poll
Expand All @@ -258,6 +272,8 @@ unsafe impl<'a> Sync for Notified<'a> {}

#[derive(Debug)]
enum State {
// Contains the lowest `notify_waiters` call number which should
// notify this waiter.
Init(usize),
Waiting,
Done,
Expand Down Expand Up @@ -387,11 +403,12 @@ impl Notify {
let state = self.state.load(SeqCst);
Notified {
notify: self,
state: State::Init(state >> NOTIFY_WAITERS_SHIFT),
state: State::Init(get_num_notify_waiters_calls(state)),
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
notified: None,
notify_waiters_queue: None,
notification: None,
_p: PhantomPinned,
}),
}
Expand Down Expand Up @@ -500,12 +517,9 @@ impl Notify {
/// }
/// ```
pub fn notify_waiters(&self) {
let mut wakers = WakeList::new();

// There are waiters, the lock must be acquired to notify.
let mut waiters = self.waiters.lock();

// The state must be reloaded while the lock is held. The state may only
// The state must be loaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
let curr = self.state.load(SeqCst);

Expand All @@ -516,30 +530,76 @@ impl Notify {
return;
}

// At this point, it is guaranteed that the state will not
// concurrently change, as holding the lock is required to
// transition **out** of `WAITING`.
'outer: loop {
while wakers.can_push() {
match waiters.pop_back() {
Some(mut waiter) => {
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(waiter.notified.is_none());
// Increment the number of times this method was called
// and transition to empty.
let new_state = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
self.state.store(new_state, SeqCst);

waiter.notified = Some(NotificationType::AllWaiters);
// We may decouple the waiters list and store it on the stack to ensure
// atomicity. This variable binding is shadowed by a reference to it
// to prevent it from moving.
let mut decoupled_list: Option<UnsafeCell<WaitList>> = None;
let decoupled_list = &mut decoupled_list;

if let Some(waker) = waiter.waker.take() {
wakers.push(waker);
let mut wakers = WakeList::new();
'outer: loop {
{
let queue = if let Some(decoupled) = decoupled_list.as_ref() {
// Safety: we hold the `waiters` lock, so we can
// mutably borrow the decoupled queue.
unsafe { &mut *decoupled.get() }
} else {
&mut *waiters
};

while wakers.can_push() {
match queue.pop_back() {
Some(mut waiter) => {
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(waiter.notification.is_none());

waiter.notification = Some(NotificationType::AllWaiters);
waiter.notify_waiters_queue = None;

if let Some(waker) = waiter.waker.take() {
wakers.push(waker);
}
}
None => {
break 'outer;
}
}
None => {
break 'outer;
}
}
}

// If there are more batches than one, decouple the list before releasing
// the lock to provide atomicity. Decoupled list will still be protected
// by the `waiters` lock.
if decoupled_list.is_none() {
// Store the list directly in the stack variable.
*decoupled_list = Some(UnsafeCell::new(std::mem::take(&mut *waiters)));

// We inform every waiter that the list it is stored in has been moved by
// storing a raw pointer to the list. The list is not moved from the stack
// of this function call, so every pointer will remain valid.
// Safety: pointer to the `decoupled_list` is not null.
let list_ptr =
unsafe { NonNull::new_unchecked(decoupled_list.as_ref().unwrap().get()) };

// Safety: we hold the `waiters` lock, so we can borrow the decoupled list.
let list_ref = unsafe { list_ptr.as_ref() };

for mut waiter in list_ref {
// Safety: we hold the `waiters` lock.
let waiter = unsafe { waiter.as_mut() };
waiter.notify_waiters_queue = Some(list_ptr);
}
}

// Release the lock before notifying.
// There are no longer any borrows of the list inside `decoupled_list` cell.
drop(waiters);

wakers.wake_all();
Expand All @@ -548,12 +608,6 @@ impl Notify {
waiters = self.waiters.lock();
}

// All waiters will be notified, the state must be transitioned to
// `EMPTY`. As transitioning **from** `WAITING` requires the lock to be
// held, a `store` is sufficient.
let new = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
self.state.store(new, SeqCst);

// Release the lock before notifying
drop(waiters);

Expand Down Expand Up @@ -597,9 +651,9 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(waiter.notified.is_none());
assert!(waiter.notification.is_none());

waiter.notified = Some(NotificationType::OneWaiter);
waiter.notification = Some(NotificationType::OneWaiter);
let waker = waiter.waker.take();

if waiters.is_empty() {
Expand Down Expand Up @@ -766,6 +820,13 @@ impl Notified<'_> {
return Poll::Ready(());
}

// Optimistically check if notify_waiters has been called
// after the future was created.
if get_num_notify_waiters_calls(curr) != initial_notify_waiters_calls {
*state = Done;
return Poll::Ready(());
}

// Clone the waker before locking, a waker clone can be
// triggering arbitrary code.
let waker = waker.cloned();
Expand All @@ -777,8 +838,7 @@ impl Notified<'_> {
// Reload the state with the lock held
let mut curr = notify.state.load(SeqCst);

// if notify_waiters has been called after the future
// was created, then we are done
// Check again if notify_waiters has been called in the meantime.
if get_num_notify_waiters_calls(curr) != initial_notify_waiters_calls {
*state = Done;
return Poll::Ready(());
Expand Down Expand Up @@ -856,11 +916,28 @@ impl Notified<'_> {
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };

if w.notified.is_some() {
// Our waker has been notified. Reset the fields and
// remove it from the list.
w.waker = None;
w.notified = None;
if w.notification.is_some() {
// Our waker has been notified and our waiter is already removed from
// the list. Reset the notification and convert to `Done`.
w.notification = None;
*state = Done;
} else if w.notify_waiters_queue.is_some() {
// There is a call to `notify_waiters` in progress. Since we already
// have the lock, remove our entry from the waiter list.

w.waker.take();

let mut decoupled_list_ptr = w.notify_waiters_queue.take().unwrap();

// as_mut safety: we hold the `waiters` lock, so we can mutably
// borrow the decoupled list.
// remove safety: the waiter *MUST* be stored in the `decoupled_list`
// because it had a pointer to it.
unsafe {
decoupled_list_ptr
.as_mut()
.remove(NonNull::new_unchecked(w))
};

*state = Done;
} else {
Expand Down Expand Up @@ -913,32 +990,45 @@ impl Drop for Notified<'_> {
// longer stored in the linked list.
if matches!(*state, Waiting) {
let mut waiters = notify.waiters.lock();
let mut notify_state = notify.state.load(SeqCst);

// remove the entry from the list (if not already removed)
//
// safety: the waiter is only added to `waiters` by virtue of it
// being the only `LinkedList` available to the type.
unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) };
// Safety: called while locked
let w = unsafe { &mut *waiter.get() };

if waiters.is_empty() && get_state(notify_state) == WAITING {
notify_state = set_state(notify_state, EMPTY);
notify.state.store(notify_state, SeqCst);
}
if let Some(mut decoupled_list_ptr) = w.notify_waiters_queue.take() {
// We hold the `waiters` lock.
let decoupled_list = unsafe { decoupled_list_ptr.as_mut() };

// See if the node was notified but not received. In this case, if
// the notification was triggered via `notify_one`, it must be sent
// to the next waiter.
//
// Safety: with the entry removed from the linked list, there can be
// no concurrent access to the entry
if matches!(
unsafe { (*waiter.get()).notified },
Some(NotificationType::OneWaiter)
) {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
drop(waiters);
waker.wake();
// safety: the waiter *MUST* be stored in the `decoupled_list`
// because it had a pointer to it.
unsafe { decoupled_list.remove(NonNull::new_unchecked(w)) };
} else {
let mut notify_state = notify.state.load(SeqCst);

// remove the entry from the list (if not already removed)
//
// safety: the waiter must be stored in `waiters` because it does
// not have pointer to any other linked list.
unsafe { waiters.remove(NonNull::new_unchecked(w)) };

if waiters.is_empty() && get_state(notify_state) == WAITING {
notify_state = set_state(notify_state, EMPTY);
notify.state.store(notify_state, SeqCst);
}

// See if the node was notified but not received. In this case, if
// the notification was triggered via `notify_one`, it must be sent
// to the next waiter.
//
// Safety: with the entry removed from the linked list, there can be
// no concurrent access to the entry
if matches!(
unsafe { (*waiter.get()).notification },
Some(NotificationType::OneWaiter)
) {
if let Some(waker) = notify_locked(&mut waiters, &notify.state, notify_state) {
drop(waiters);
waker.wake();
}
}
Comment on lines +1018 to 1032
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can also cause weird things ... I know this was already here, and maybe the answer is to just not change it.

Imagine this sequence of actions:

  1. Create two Notified futures and poll them.
  2. Call notify_one twice.
  3. Create two new Notified futures.
  4. Drop the original two Notified futures.
  5. Both of the futures from step 3 complete.

This is weird since the futures that completed were created after the notify_one calls, and the fact that there are two means that we can't just pretend that this is due to the permit being stored.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this for a while and I think that's a tough problem. The issue is that we would like to think about cancelling a Notified future as simply removing it from the queue, like it's written in the docs. In reality, a future is removed much earlier and at drop we only simulate what would happen, if the future was removed before receiving the permit. In this code, it is done by passing the permit to another waiting future.

However, we can pass the permit from a future A to a waiting future B if and only if at any point between receiving the permit by A and pushing B to the queue the state of Notify didn't reach NOTIFIED. We could check for that by enumerating all events of calling notify_one and enabling Notified futures and keeping the last number when the state was NOTIFIED. In Drop it would suffice to check if the future being dropped isn't older than this number.

Unfortunately, the problem is deeper than that, and your example demonstrates this. We can drop multiple futures, and simulating that one of them was removed from the queue before receiving the permit will affect how we simulate the same for others.

I like to think about it the following way: let $e_t$ represent the $t$-th event. $e_t = 1$ if the $t$-th event is enabling a Notified future and $e_t = -1$ if it is a notify_one call. Then we can define the number of waiters after the $t$-th event $w_t = \max(-1, w_{t-1} + e_t)$. There is a special case $w_t = -1$ if there are no waiters and the state is NOTIFIED. To check whether we can transfer the permit from a dropped future, we should track the last number when the state was NOTIFIED which is the greatest $T$ which fulfills $w_T = -1$.

The issue is that dropping a future enabled in the $t$-th event means we change $e_t$ from $1$ to $0$, what affects all newer values $w_t, w_{t+1}, \ldots$. In particular, it can change one of them to $-1$. Such a change can increase $T$ and we have to somehow account for that inside the Drop impl.

To sum up, we would need a complicated data structure to effectively track whether we should transfer the permit from a dropped future, for example a deque or some sort of segment tree, which probably is not feasible here. I think the best we can do is the heuristic I've mentioned earlier, but it doesn't even fix your case, so in my opinion we should probably leave this code as it is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that we ran into a similar problem during a discussion about adding a condition variable to Tokio. To me, it seems that the best option is to just document that dropping a future that has consumed a permit from a notify_one call will pass on its permit as-if notify_one was called in the destructor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will be happy to open a PR improving the docs.

}
}
Expand Down