Skip to content

Commit

Permalink
sync: tweak watch API (#2814)
Browse files Browse the repository at this point in the history
Decouples getting the latest `watch` value from receiving the change
notification. The `Receiver` async method becomes
`Receiver::changed()`. The latest value is obtained from
`Receiver::borrow()`.

The implementation is updated to use `Notify`. This requires adding
`Notify::notify_waiters`. This method is generally useful but is kept
private for now.
  • Loading branch information
carllerche committed Sep 11, 2020
1 parent c5a9ede commit 2bc9a48
Show file tree
Hide file tree
Showing 8 changed files with 281 additions and 319 deletions.
4 changes: 3 additions & 1 deletion tokio/src/sync/barrier.rs
Expand Up @@ -110,9 +110,11 @@ impl Barrier {
let mut wait = self.wait.clone();

loop {
let _ = wait.changed().await;

// note that the first time through the loop, this _will_ yield a generation
// immediately, since we cloned a receiver that has never seen any values.
if wait.recv().await >= generation {
if *wait.borrow() >= generation {
break;
}
}
Expand Down
10 changes: 4 additions & 6 deletions tokio/src/sync/mod.rs
Expand Up @@ -355,10 +355,8 @@
//! let op = my_async_operation();
//! tokio::pin!(op);
//!
//! // Receive the **initial** configuration value. As this is the
//! // first time the config is received from the watch, it will
//! // always complete immediatedly.
//! let mut conf = rx.recv().await;
//! // Get the initial config value
//! let mut conf = rx.borrow().clone();
//!
//! let mut op_start = Instant::now();
//! let mut delay = time::delay_until(op_start + conf.timeout);
Expand All @@ -375,8 +373,8 @@
//! // Restart the timeout
//! delay = time::delay_until(op_start + conf.timeout);
//! }
//! new_conf = rx.recv() => {
//! conf = new_conf;
//! _ = rx.changed() => {
//! conf = rx.borrow().clone();
//!
//! // The configuration has been updated. Update the
//! // `delay` using the new `timeout` value.
Expand Down
52 changes: 49 additions & 3 deletions tokio/src/sync/notify.rs
Expand Up @@ -123,7 +123,7 @@ struct Waiter {

/// Future returned from `notified()`
#[derive(Debug)]
struct Notified<'a> {
pub struct Notified<'a> {
/// The `Notify` being received on.
notify: &'a Notify,

Expand Down Expand Up @@ -172,6 +172,12 @@ impl Notify {

/// Wait for a notification.
///
/// Equivalent to:
///
/// ```ignore
/// async fn notified(&self);
/// ```
///
/// Each `Notify` value holds a single permit. If a permit is available from
/// an earlier call to [`notify_one()`], then `notified().await` will complete
/// immediately, consuming that permit. Otherwise, `notified().await` waits
Expand Down Expand Up @@ -199,7 +205,7 @@ impl Notify {
/// notify.notify_one();
/// }
/// ```
pub async fn notified(&self) {
pub fn notified(&self) -> Notified<'_> {
Notified {
notify: self,
state: State::Init,
Expand All @@ -210,7 +216,6 @@ impl Notify {
_p: PhantomPinned,
}),
}
.await
}

/// Notifies a waiting task
Expand Down Expand Up @@ -279,6 +284,45 @@ impl Notify {
waker.wake();
}
}

/// Notifies all waiting tasks
pub(crate) fn notify_waiters(&self) {
// There are waiters, the lock must be acquired to notify.
let mut waiters = self.waiters.lock().unwrap();

// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
let curr = self.state.load(SeqCst);

if let EMPTY | NOTIFIED = curr {
// There are no waiting tasks. In this case, no synchronization is
// established between `notify` and `notified().await`.
return;
}

// At this point, it is guaranteed that the state will not
// concurrently change, as holding the lock is required to
// transition **out** of `WAITING`.
//
// Get pending waiters
while let Some(mut waiter) = waiters.pop_back() {
// Safety: `waiters` lock is still held.
let waiter = unsafe { waiter.as_mut() };

assert!(!waiter.notified);

waiter.notified = true;

if let Some(waker) = waiter.waker.take() {
waker.wake();
}
}

// All waiters have been notified, the state must be transitioned to
// `EMPTY`. As transitioning **from** `WAITING` requires the lock to be
// held, a `store` is sufficient.
self.state.store(EMPTY, SeqCst);
}
}

impl Default for Notify {
Expand Down Expand Up @@ -430,6 +474,8 @@ impl Future for Notified<'_> {
waiters.push_front(unsafe { NonNull::new_unchecked(waiter.get()) });

*state = Waiting;

return Poll::Pending;
}
Waiting => {
// Currently in the "Waiting" state, implying the caller has
Expand Down
20 changes: 20 additions & 0 deletions tokio/src/sync/tests/loom_watch.rs
@@ -0,0 +1,20 @@
use crate::sync::watch;

use loom::future::block_on;
use loom::thread;

#[test]
fn smoke() {
loom::model(|| {
let (tx, mut rx) = watch::channel(1);

let th = thread::spawn(move || {
tx.send(2).unwrap();
});

block_on(rx.changed()).unwrap();
assert_eq!(*rx.borrow(), 2);

th.join().unwrap();
})
}
1 change: 1 addition & 0 deletions tokio/src/sync/tests/mod.rs
Expand Up @@ -13,4 +13,5 @@ cfg_loom! {
mod loom_oneshot;
mod loom_semaphore_batch;
mod loom_semaphore_ll;
mod loom_watch;
}

0 comments on commit 2bc9a48

Please sign in to comment.