Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: tweak watch API #2814

Merged
merged 5 commits into from Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion tokio/src/sync/barrier.rs
Expand Up @@ -110,9 +110,11 @@ impl Barrier {
let mut wait = self.wait.clone();

loop {
let _ = wait.changed().await;

// note that the first time through the loop, this _will_ yield a generation
// immediately, since we cloned a receiver that has never seen any values.
if wait.recv().await >= generation {
if *wait.borrow() >= generation {
break;
}
}
Expand Down
10 changes: 4 additions & 6 deletions tokio/src/sync/mod.rs
Expand Up @@ -355,10 +355,8 @@
//! let op = my_async_operation();
//! tokio::pin!(op);
//!
//! // Receive the **initial** configuration value. As this is the
//! // first time the config is received from the watch, it will
//! // always complete immediatedly.
//! let mut conf = rx.recv().await;
//! // Get the initial config value
//! let mut conf = rx.borrow().clone();
//!
//! let mut op_start = Instant::now();
//! let mut delay = time::delay_until(op_start + conf.timeout);
Expand All @@ -375,8 +373,8 @@
//! // Restart the timeout
//! delay = time::delay_until(op_start + conf.timeout);
//! }
//! new_conf = rx.recv() => {
//! conf = new_conf;
//! _ = rx.changed() => {
//! conf = rx.borrow().clone();
//!
//! // The configuration has been updated. Update the
//! // `delay` using the new `timeout` value.
Expand Down
52 changes: 49 additions & 3 deletions tokio/src/sync/notify.rs
Expand Up @@ -121,7 +121,7 @@ struct Waiter {

/// Future returned from `notified()`
#[derive(Debug)]
struct Notified<'a> {
pub struct Notified<'a> {
/// The `Notify` being received on.
notify: &'a Notify,

Expand Down Expand Up @@ -170,6 +170,12 @@ impl Notify {

/// Wait for a notification.
///
/// Equivalent to:
///
/// ```ignore
/// async fn notified(&self);
/// ```
///
/// Each `Notify` value holds a single permit. If a permit is available from
/// an earlier call to [`notify()`], then `notified().await` will complete
/// immediately, consuming that permit. Otherwise, `notified().await` waits
Expand Down Expand Up @@ -197,7 +203,7 @@ impl Notify {
/// notify.notify();
/// }
/// ```
pub async fn notified(&self) {
pub fn notified(&self) -> Notified<'_> {
Notified {
notify: self,
state: State::Init,
Expand All @@ -208,7 +214,6 @@ impl Notify {
_p: PhantomPinned,
}),
}
.await
}

/// Notifies a waiting task
Expand Down Expand Up @@ -277,6 +282,45 @@ impl Notify {
waker.wake();
}
}

/// Notifies all waiting tasks
pub(crate) fn notify_waiters(&self) {
// There are waiters, the lock must be acquired to notify.
let mut waiters = self.waiters.lock().unwrap();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like we call notify_waiters in drop impls in a couple places. should this not panic if the lock is poisoned?

i think we still want to remove the waiters regardless of whether the lock is poisoned, so this should probably be

Suggested change
let mut waiters = self.waiters.lock().unwrap();
let mut waiters = self.waiters.lock().unwrap_or_else(PoisonError::into_inner);

since parking_lot doesn't have poisoning, the current code is already equivalent to this for a lot of production users...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really worried about lock poisoning here. In general, I rather pretend lock poisoning isn't a thing for internal usage. This is generally how I think about Mutex across the code base. I guess we should update our internal Mutex shim to abort the process instead of panic then? That would be a separate PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should update our internal Mutex shim to abort the process instead of panic then? That would be a separate PR?

I think it's better to update the internal mutex shim to make locking infallible by ignoring poisoned mutex errors via into_inner. This would result in essentially the same behavior as we get with parking_lot regardless of whether or not the parking_lot feature is enabled. Aborting the process any time a mutex is poisoned could potentially make a lot of errors, including in user code, much harder to debug.


// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
let curr = self.state.load(SeqCst);

if let EMPTY | NOTIFIED = curr {
// There are no waiting tasks. In this case, no synchronization is
// established between `notify` and `notified().await`.
return;
}

// At this point, it is guaranteed that the state will not
// concurrently change, as holding the lock is required to
// transition **out** of `WAITING`.
//
// Get pending waiters
while let Some(mut waiter) = waiters.pop_back() {
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(!waiter.notified);

waiter.notified = true;

if let Some(waker) = waiter.waker.take() {
waker.wake();
}
hawkw marked this conversation as resolved.
Show resolved Hide resolved
}

// All waiters have been notified, the state must be transitioned to
// `EMPTY`. As transitioning **from** `WAITING` requires the lock to be
// held, a `store` is sufficient.
self.state.store(EMPTY, SeqCst);
}
}

impl Default for Notify {
Expand Down Expand Up @@ -428,6 +472,8 @@ impl Future for Notified<'_> {
waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });

*state = Waiting;

return Poll::Pending;
}
Waiting => {
// Currently in the "Waiting" state, implying the caller has
Expand Down
20 changes: 20 additions & 0 deletions tokio/src/sync/tests/loom_watch.rs
@@ -0,0 +1,20 @@
use crate::sync::watch;

use loom::future::block_on;
use loom::thread;

#[test]
fn smoke() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test catches the issues that were mentioned by @hawkw

loom::model(|| {
let (tx, mut rx) = watch::channel(1);

let th = thread::spawn(move || {
tx.send(2).unwrap();
});

block_on(rx.changed()).unwrap();
assert_eq!(*rx.borrow(), 2);

th.join().unwrap();
})
}
1 change: 1 addition & 0 deletions tokio/src/sync/tests/mod.rs
Expand Up @@ -13,4 +13,5 @@ cfg_loom! {
mod loom_oneshot;
mod loom_semaphore_batch;
mod loom_semaphore_ll;
mod loom_watch;
}