Skip to content

Commit

Permalink
sync: add Notify::notify_waiters (#3098)
Browse files Browse the repository at this point in the history
This PR makes `Notify::notify_waiters` public. The method
already exists, but it changes the way `notify_waiters`,
is used. Previously in order for the consumer to
register interest, in a notification triggered by
`notify_waiters`, the `Notified` future had to be
polled. This introduced friction when using the api
as the future had to be pinned before polled.

This change introduces a counter that tracks how many
times `notified_waiters` has been called. Upon creation of
the future the number of times is loaded. When first
polled the future compares this number with the count
state of the `Notify` type. This avoids the need for
registering the waiter upfront.

Fixes: #3066
  • Loading branch information
zaharidichev committed Nov 16, 2020
1 parent f5cb4c2 commit d0ebb41
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 80 deletions.
19 changes: 2 additions & 17 deletions tokio/src/sync/mpsc/chan.rs
Expand Up @@ -148,25 +148,10 @@ impl<T, S: Semaphore> Tx<T, S> {
}

pub(crate) async fn closed(&self) {
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;

// In order to avoid a race condition, we first request a notification,
// **then** check the current value's version. If a new version exists,
// the notification request is dropped. Requesting the notification
// requires polling the future once.
// **then** check whether the semaphore is closed. If the semaphore is
// closed the notification request is dropped.
let notified = self.inner.notify_rx_closed.notified();
pin!(notified);

// Polling the future once is guaranteed to return `Pending` as `watch`
// only notifies using `notify_waiters`.
crate::future::poll_fn(|cx| {
let res = Pin::new(&mut notified).poll(cx);
assert!(!res.is_ready());
Poll::Ready(())
})
.await;

if self.inner.semaphore.is_closed() {
return;
Expand Down

0 comments on commit d0ebb41

Please sign in to comment.