Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid pinning notified future #3098

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 2 additions & 17 deletions tokio/src/sync/mpsc/chan.rs
Expand Up @@ -148,25 +148,10 @@ impl<T, S: Semaphore> Tx<T, S> {
}

pub(crate) async fn closed(&self) {
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

// In order to avoid a race condition, we first request a notification,
// **then** check the current value's version. If a new version exists,
// the notification request is dropped. Requesting the notification
// requires polling the future once.
// **then** check whether the semaphore is closed. If the semaphore is
// closed the notification request is dropped.
let notified = self.inner.notify_rx_closed.notified();
pin!(notified);

// Polling the future once is guaranteed to return `Pending` as `watch`
// only notifies using `notify_waiters`.
crate::future::poll_fn(|cx| {
let res = Pin::new(&mut notified).poll(cx);
assert!(!res.is_ready());
Poll::Ready(())
})
.await;

if self.inner.semaphore.is_closed() {
return;
Expand Down
157 changes: 120 additions & 37 deletions tokio/src/sync/notify.rs
Expand Up @@ -5,7 +5,7 @@
// triggers this warning but it is safe to ignore in this case.
#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]

use crate::loom::sync::atomic::AtomicU8;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::Mutex;
use crate::util::linked_list::{self, LinkedList};

Expand Down Expand Up @@ -109,7 +109,11 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// [`Semaphore`]: crate::sync::Semaphore
#[derive(Debug)]
pub struct Notify {
state: AtomicU8,
// this uses 2 bits to store one of `EMPTY`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very, very minor nit: other sentences in this comment start with capital letters; let's be consistent:

Suggested change
// this uses 2 bits to store one of `EMPTY`,
// This uses 2 bits to store one of `EMPTY`,

// `WAITING` or "NOTIFIED". The rest of the bits
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
// `WAITING` or "NOTIFIED". The rest of the bits
// `WAITING` or `NOTIFIED`. The rest of the bits

// are used to store the number of times `notify_waiters`
// was called.
state: AtomicUsize,
waiters: Mutex<WaitList>,
}

Expand Down Expand Up @@ -154,19 +158,40 @@ unsafe impl<'a> Sync for Notified<'a> {}

#[derive(Debug)]
enum State {
Init,
Init(usize),
Waiting,
Done,
}

const NOTIFY_WAITERS_SHIFT: usize = 2;
const STATE_MASK: usize = (1 << NOTIFY_WAITERS_SHIFT) - 1;
const NOTIFY_WAITERS_CALLS_MASK: usize = !STATE_MASK;
hawkw marked this conversation as resolved.
Show resolved Hide resolved

/// Initial "idle" state
const EMPTY: u8 = 0;
const EMPTY: usize = 0;

/// One or more threads are currently waiting to be notified.
const WAITING: u8 = 1;
const WAITING: usize = 1;

/// Pending notification
const NOTIFIED: u8 = 2;
const NOTIFIED: usize = 2;

fn set_state(data: usize, state: usize) -> usize {
(data & NOTIFY_WAITERS_CALLS_MASK) | (state & STATE_MASK)
}

fn get_state(data: usize) -> usize {
data & STATE_MASK
}

fn get_num_notify_waiters_calls(data: usize) -> usize {
(data & NOTIFY_WAITERS_CALLS_MASK) >> NOTIFY_WAITERS_SHIFT
}

fn inc_num_notify_waiters_calls(data: usize) -> usize {
let new = get_num_notify_waiters_calls(data) + 1;
(data & STATE_MASK) | (new << NOTIFY_WAITERS_SHIFT)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like this can just be data + (1 << NOTIFY_WAITERS_SHIFT), right?


impl Notify {
/// Create a new `Notify`, initialized without a permit.
Expand All @@ -180,7 +205,7 @@ impl Notify {
/// ```
pub fn new() -> Notify {
Notify {
state: AtomicU8::new(0),
state: AtomicUsize::new(0),
waiters: Mutex::new(LinkedList::new()),
}
}
Expand All @@ -198,7 +223,7 @@ impl Notify {
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn const_new() -> Notify {
Notify {
state: AtomicU8::new(0),
state: AtomicUsize::new(0),
waiters: Mutex::const_new(LinkedList::new()),
}
}
Expand Down Expand Up @@ -239,9 +264,12 @@ impl Notify {
/// }
/// ```
pub fn notified(&self) -> Notified<'_> {
// we load the number of times notify_waiters
// was called and store that in our initial state
let state = self.state.load(SeqCst);
Notified {
notify: self,
state: State::Init,
state: State::Init(state >> NOTIFY_WAITERS_SHIFT),
waiter: UnsafeCell::new(Waiter {
pointers: linked_list::Pointers::new(),
waker: None,
Expand Down Expand Up @@ -290,11 +318,12 @@ impl Notify {
let mut curr = self.state.load(SeqCst);

// If the state is `EMPTY`, transition to `NOTIFIED` and return.
while let EMPTY | NOTIFIED = curr {
while let EMPTY | NOTIFIED = get_state(curr) {
// The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
// happens-before synchronization must happen between this atomic
// operation and a task calling `notified().await`.
let res = self.state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst);
let new = set_state(curr, NOTIFIED);
let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);

match res {
// No waiters, no further work to do
Expand All @@ -319,17 +348,51 @@ impl Notify {
}

/// Notifies all waiting tasks
pub(crate) fn notify_waiters(&self) {
///
/// If a task is currently waiting, that task is notified. Unlike with
/// `notify()`, no permit is stored to be used by the next call to
/// [`notified().await`]. The purpose of this method is to notify all
/// already registered waiters. Registering for notification is done by
/// acquiring an instance of the `Notified` future via calling `notified()`.
///
/// # Examples
///
/// ```
/// use tokio::sync::Notify;
/// use std::sync::Arc;
///
/// #[tokio::main]
/// async fn main() {
/// let notify = Arc::new(Notify::new());
/// let notify2 = notify.clone();
///
/// let notified1 = notify.notified();
/// let notified2 = notify.notified();
///
/// let handle = tokio::spawn(async move {
/// println!("sending notifications");
/// notify2.notify_waiters();
/// });
///
/// notified1.await;
/// notified2.await;
/// println!("received notifications");
/// }
/// ```
pub fn notify_waiters(&self) {
carllerche marked this conversation as resolved.
Show resolved Hide resolved
// There are waiters, the lock must be acquired to notify.
let mut waiters = self.waiters.lock();

// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
let curr = self.state.load(SeqCst);

if let EMPTY | NOTIFIED = curr {
if let EMPTY | NOTIFIED = get_state(curr) {
// There are no waiting tasks. In this case, no synchronization is
// established between `notify` and `notified().await`.
// All we need to do is increment the number of times this
// method was called.
self.state.store(inc_num_notify_waiters_calls(curr), SeqCst);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this seems like it could just be a fetch_add?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"just" would actually be more expensive :) A store is usually cheaper than RMW. SeqCst changes that a bit, but the SeqCst isn't strictly necessary here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carllerche would Release suffice here ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carllerche sure, i was thinking about the SeqCst load-store pair with instructions in between, which is two synchronization points rather than one, right? but if SeqCst is not load bearing here that may not be the case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is AcqRel is sufficient for Notify, but I have not looked into it much.

return;
}

Expand All @@ -354,7 +417,8 @@ impl Notify {
// All waiters have been notified, the state must be transitioned to
// `EMPTY`. As transitioning **from** `WAITING` requires the lock to be
// held, a `store` is sufficient.
self.state.store(EMPTY, SeqCst);
let new = set_state(inc_num_notify_waiters_calls(curr), EMPTY);
self.state.store(new, SeqCst);
}
}

Expand All @@ -364,17 +428,18 @@ impl Default for Notify {
}
}

fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option<Waker> {
fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Option<Waker> {
loop {
match curr {
match get_state(curr) {
EMPTY | NOTIFIED => {
let res = state.compare_exchange(curr, NOTIFIED, SeqCst, SeqCst);
let res = state.compare_exchange(curr, set_state(curr, NOTIFIED), SeqCst, SeqCst);

match res {
Ok(_) => return None,
Err(actual) => {
assert!(actual == EMPTY || actual == NOTIFIED);
state.store(NOTIFIED, SeqCst);
let actual_state = get_state(actual);
assert!(actual_state == EMPTY || actual_state == NOTIFIED);
state.store(set_state(actual, NOTIFIED), SeqCst);
return None;
}
}
Expand All @@ -400,7 +465,7 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicU8, curr: u8) -> Option<W
// must be transitioned to `EMPTY`. As transitioning
// **from** `WAITING` requires the lock to be held, a
// `store` is sufficient.
state.store(EMPTY, SeqCst);
state.store(set_state(curr, EMPTY), SeqCst);
}

return waker;
Expand All @@ -420,7 +485,7 @@ impl Notified<'_> {
// Safety: both `notify` and `state` are `Unpin`.

is_unpin::<&Notify>();
is_unpin::<AtomicU8>();
is_unpin::<AtomicUsize>();

let me = self.get_unchecked_mut();
(&me.notify, &mut me.state, &me.waiter)
Expand All @@ -438,11 +503,16 @@ impl Future for Notified<'_> {

loop {
match *state {
Init => {
Init(initial_notify_waiters_calls) => {
let curr = notify.state.load(SeqCst);

// Optimistically try acquiring a pending notification
let res = notify
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst);
let res = notify.state.compare_exchange(
set_state(curr, NOTIFIED),
set_state(curr, EMPTY),
SeqCst,
SeqCst,
);

if res.is_ok() {
// Acquired the notification
Expand All @@ -457,17 +527,27 @@ impl Future for Notified<'_> {
// Reload the state with the lock held
let mut curr = notify.state.load(SeqCst);

// if notify_waiters has been called after the future
// was created, then we are done
if get_num_notify_waiters_calls(curr) != initial_notify_waiters_calls {
*state = Done;
return Poll::Ready(());
}

// Transition the state to WAITING.
loop {
match curr {
match get_state(curr) {
EMPTY => {
// Transition to WAITING
let res = notify
.state
.compare_exchange(EMPTY, WAITING, SeqCst, SeqCst);
let res = notify.state.compare_exchange(
set_state(curr, EMPTY),
set_state(curr, WAITING),
SeqCst,
SeqCst,
);

if let Err(actual) = res {
assert_eq!(actual, NOTIFIED);
assert_eq!(get_state(actual), NOTIFIED);
curr = actual;
} else {
break;
Expand All @@ -476,9 +556,12 @@ impl Future for Notified<'_> {
WAITING => break,
NOTIFIED => {
// Try consuming the notification
let res = notify
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst);
let res = notify.state.compare_exchange(
set_state(curr, NOTIFIED),
set_state(curr, EMPTY),
SeqCst,
SeqCst,
);

match res {
Ok(_) => {
Expand All @@ -487,7 +570,7 @@ impl Future for Notified<'_> {
return Poll::Ready(());
}
Err(actual) => {
assert_eq!(actual, EMPTY);
assert_eq!(get_state(actual), EMPTY);
curr = actual;
}
}
Expand Down Expand Up @@ -563,8 +646,8 @@ impl Drop for Notified<'_> {
// dropped, which means we must ensure that the waiter entry is no
// longer stored in the linked list.
if let Waiting = *state {
let mut notify_state = WAITING;
let mut waiters = notify.waiters.lock();
let mut notify_state = notify.state.load(SeqCst);

// `Notify.state` may be in any of the three states (Empty, Waiting,
// Notified). It doesn't actually matter what the atomic is set to
Expand All @@ -587,14 +670,14 @@ impl Drop for Notified<'_> {
unsafe { waiters.remove(NonNull::new_unchecked(waiter.get())) };

if waiters.is_empty() {
notify_state = EMPTY;
notify_state = set_state(notify_state, EMPTY);
// If the state *should* be `NOTIFIED`, the call to
// `notify_locked` below will end up doing the
// `store(NOTIFIED)`. If a concurrent receiver races and
// observes the incorrect `EMPTY` state, it will then obtain the
// lock and block until `notify.state` is in the correct final
// state.
notify.state.store(EMPTY, SeqCst);
notify.state.store(notify_state, SeqCst);
}

// See if the node was notified but not received. In this case, if
Expand Down
21 changes: 21 additions & 0 deletions tokio/src/sync/tests/loom_notify.rs
Expand Up @@ -21,6 +21,27 @@ fn notify_one() {
});
}

#[test]
fn notify_waiters() {
loom::model(|| {
let notify = Arc::new(Notify::new());
let tx = notify.clone();
let notified1 = notify.notified();
let notified2 = notify.notified();

let th = thread::spawn(move || {
tx.notify_waiters();
});

th.join().unwrap();

block_on(async {
notified1.await;
notified2.await;
});
});
}

#[test]
fn notify_multi() {
loom::model(|| {
Expand Down