Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid pinning notified future #3098

Merged

Conversation

zaharidichev
Copy link
Contributor

Motivation

Previously in order for the consumer to
register interest, in a notification triggered by
notify_waiters, the Notified future had to be
polled. This introduced friction when using the api
as the future had to be pinned before polled.

Solution

This change introduces a counter that tracks how many
times notified_waiters has been called. Upon creation of
the future the number of times is loaded. When first
polled the future compares this number with the count
state of the Notify type. This avoids the need for
registering the waiter upfront.

Fixes: #3066

Signed-off-by: Zahari Dichev zaharidichev@gmail.com

This PR changes the way `notify_waiters`,
is used. Previously in order for the consumer to
register interest, in a notification triggered by
`notify_waiters`, the `Notified` future had to be
polled. This introduced friction when using the api
as the future had to be pinned before polled.

This change introduces a counter that tracks how many
times `notified_waiters` has been called. Upon creation of
the future the number of times is loaded. When first
polled the future compares this number with the count
state of the `Notify` type. This avoids the need for
registering the waiter upfront.

Fixes: tokio-rs#3066

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
@Darksonn Darksonn added A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-sync Module: tokio/sync labels Nov 5, 2020
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 👍 I have an inline comment. We could punt this to another PR.

I would like another review before merging. Maybe @hawkw ?

tokio/src/sync/notify.rs Show resolved Hide resolved
Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this seems pretty straightforward. The potential deadlock @carllerche pointed out in #3098 (comment) is the only blocking issue I could find.

Other than that, you might be able to simplify some of the bit-packing code a little bit using the existing utils, but that's more of a style thing.

tokio/src/sync/notify.rs Show resolved Hide resolved
Comment on lines 191 to 194
fn inc_num_notify_waiters_calls(data: usize) -> usize {
let new = get_num_notify_waiters_calls(data) + 1;
(data & STATE_MASK) | (new << NOTIFY_WAITERS_SHIFT)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like this can just be data + (1 << NOTIFY_WAITERS_SHIFT), right?

// There are no waiting tasks. In this case, no synchronization is
// established between `notify` and `notified().await`.
// All we need to do is increment the number of times this
// method was called.
self.state.store(inc_num_notify_waiters_calls(curr), SeqCst);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this seems like it could just be a fetch_add?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"just" would actually be more expensive :) A store is usually cheaper than RMW. SeqCst changes that a bit, but the SeqCst isn't strictly necessary here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carllerche would Release suffice here ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carllerche sure, i was thinking about the SeqCst load-store pair with instructions in between, which is two synchronization points rather than one, right? but if SeqCst is not load bearing here that may not be the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is AcqRel is sufficient for Notify, but I have not looked into it much.

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me overall! I had some minor suggestions.

@@ -109,7 +109,11 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// [`Semaphore`]: crate::sync::Semaphore
#[derive(Debug)]
pub struct Notify {
state: AtomicU8,
// this uses 2 bits to store one of `EMPTY`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very, very minor nit: other sentences in this comment start with capital letters; let's be consistent:

Suggested change
// this uses 2 bits to store one of `EMPTY`,
// This uses 2 bits to store one of `EMPTY`,

@@ -109,7 +109,11 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// [`Semaphore`]: crate::sync::Semaphore
#[derive(Debug)]
pub struct Notify {
state: AtomicU8,
// this uses 2 bits to store one of `EMPTY`,
// `WAITING` or "NOTIFIED". The rest of the bits
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
// `WAITING` or "NOTIFIED". The rest of the bits
// `WAITING` or `NOTIFIED`. The rest of the bits

Comment on lines +382 to +385
const NUM_WAKERS: usize = 32;

let mut wakers: [Option<Waker>; NUM_WAKERS] = Default::default();
let mut curr_waker = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i might move this until after the if let on line 394, since we might return early there. we could skip initializing the stack array in that case, since it won't be used. this probably doesn't have a huge performance impact but it might be worth doing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid complicating code for micro-optimizations unless it is measurable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought it also reads nicer if these variables are declared closer to where they're actually used, but 🤷

Comment on lines +407 to +420
while curr_waker < NUM_WAKERS {
match waiters.pop_back() {
Some(mut waiter) => {
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(waiter.notified.is_none());
assert!(waiter.notified.is_none());

waiter.notified = Some(NotificationType::AllWaiters);
waiter.notified = Some(NotificationType::AllWaiters);

if let Some(waker) = waiter.waker.take() {
waker.wake();
if let Some(waker) = waiter.waker.take() {
wakers[curr_waker] = Some(waker);
curr_waker += 1;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIOLI: this could be a for loop:

for slot in &mut wakers {
    match waiters.pop_back() {
        Some(mut waiter) => {
            // Safety: `waiters` lock is still held.
            let waiter = unsafe { waiter.as_mut() };

            assert!(waiter.notified.is_none());

            waiter.notified = Some(NotificationType::AllWaiters);

            if let Some(waker) = waiter.waker.take() {
                *slot = Some(waker);
            }
        }
        None => break,
    }
}

@carllerche
Copy link
Member

I added a test to cover the potential deadlock when waking in the mutex. I verified it locked in the earlier version and passes now.

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Feel free to merge once @hawkw is happy 👍 thanks.

Copy link
Member

@hawkw hawkw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all my remaining feedback is just not-picking, feel free to ship whenever 👍

Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging this pull request may close these issues.

sync: provide API for notifying multiple waiters.
4 participants