Skip to content

Commit

Permalink
sync: add Notified::enable (#4705)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed May 19, 2022
1 parent a2112b4 commit 4675087
Showing 1 changed file with 235 additions and 23 deletions.
258 changes: 235 additions & 23 deletions tokio/src/sync/notify.rs
Expand Up @@ -26,22 +26,23 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// `Notify` itself does not carry any data. Instead, it is to be used to signal
/// another task to perform an operation.
///
/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits.
/// [`notified().await`] waits for a permit to become available, and [`notify_one()`]
/// sets a permit **if there currently are no available permits**.
/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
/// [`notified().await`] method waits for a permit to become available, and
/// [`notify_one()`] sets a permit **if there currently are no available
/// permits**.
///
/// The synchronization details of `Notify` are similar to
/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
/// value contains a single permit. [`notified().await`] waits for the permit to
/// be made available, consumes the permit, and resumes. [`notify_one()`] sets the
/// permit, waking a pending task if there is one.
/// be made available, consumes the permit, and resumes. [`notify_one()`] sets
/// the permit, waking a pending task if there is one.
///
/// If `notify_one()` is called **before** `notified().await`, then the next call to
/// `notified().await` will complete immediately, consuming the permit. Any
/// subsequent calls to `notified().await` will wait for a new permit.
/// If `notify_one()` is called **before** `notified().await`, then the next
/// call to `notified().await` will complete immediately, consuming the permit.
/// Any subsequent calls to `notified().await` will wait for a new permit.
///
/// If `notify_one()` is called **multiple** times before `notified().await`, only a
/// **single** permit is stored. The next call to `notified().await` will
/// If `notify_one()` is called **multiple** times before `notified().await`,
/// only a **single** permit is stored. The next call to `notified().await` will
/// complete immediately, but the one after will wait for a new permit.
///
/// # Examples
Expand Down Expand Up @@ -70,7 +71,11 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// }
/// ```
///
/// Unbound mpsc channel.
/// Unbound multi-producer single-consumer (mpsc) channel.
///
/// No wakeups can be lost when using this channel because the call to
/// `notify_one()` will store a permit in the `Notify`, which the following call
/// to `notified()` will consume.
///
/// ```
/// use tokio::sync::Notify;
Expand All @@ -92,6 +97,8 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// self.notify.notify_one();
/// }
///
/// // This is a single-consumer channel, so several concurrent calls to
/// // `recv` are not allowed.
/// pub async fn recv(&self) -> T {
/// loop {
/// // Drain values
Expand All @@ -106,10 +113,87 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// }
/// ```
///
/// Unbound multi-producer multi-consumer (mpmc) channel.
///
/// The call to [`enable`] is important because otherwise if you have two
/// calls to `recv` and two calls to `send` in parallel, the following could
/// happen:
///
/// 1. Both calls to `try_recv` return `None`.
/// 2. Both new elements are added to the vector.
/// 3. The `notify_one` method is called twice, adding only a single
/// permit to the `Notify`.
/// 4. Both calls to `recv` reach the `Notified` future. One of them
/// consumes the permit, and the other sleeps forever.
///
/// By adding the `Notified` futures to the list by calling `enable` before
/// `try_recv`, the `notify_one` calls in step three would remove the
/// futures from the list and mark them notified instead of adding a permit
/// to the `Notify`. This ensures that both futures are woken.
///
/// Notice that this failure can only happen if there are two concurrent calls
/// to `recv`. This is why the mpsc example above does not require a call to
/// `enable`.
///
/// ```
/// use tokio::sync::Notify;
///
/// use std::collections::VecDeque;
/// use std::sync::Mutex;
///
/// struct Channel<T> {
/// messages: Mutex<VecDeque<T>>,
/// notify_on_sent: Notify,
/// }
///
/// impl<T> Channel<T> {
/// pub fn send(&self, msg: T) {
/// let mut locked_queue = self.messages.lock().unwrap();
/// locked_queue.push_back(msg);
/// drop(locked_queue);
///
/// // Send a notification to one of the calls currently
/// // waiting in a call to `recv`.
/// self.notify_on_sent.notify_one();
/// }
///
/// pub fn try_recv(&self) -> Option<T> {
/// let mut locked_queue = self.messages.lock().unwrap();
/// locked_queue.pop_front()
/// }
///
/// pub async fn recv(&self) -> T {
/// let future = self.notify_on_sent.notified();
/// tokio::pin!(future);
///
/// loop {
/// // Make sure that no wakeup is lost if we get
/// // `None` from `try_recv`.
/// future.as_mut().enable();
///
/// if let Some(msg) = self.try_recv() {
/// return msg;
/// }
///
/// // Wait for a call to `notify_one`.
/// //
/// // This uses `.as_mut()` to avoid consuming the future,
/// // which lets us call `Pin::set` below.
/// future.as_mut().await;
///
/// // Reset the future in case another call to
/// // `try_recv` got the message before us.
/// future.set(self.notify_on_sent.notified());
/// }
/// }
/// }
/// ```
///
/// [park]: std::thread::park
/// [unpark]: std::thread::Thread::unpark
/// [`notified().await`]: Notify::notified()
/// [`notify_one()`]: Notify::notify_one()
/// [`enable`]: Notified::enable()
/// [`Semaphore`]: crate::sync::Semaphore
#[derive(Debug)]
pub struct Notify {
Expand Down Expand Up @@ -145,7 +229,10 @@ struct Waiter {
_p: PhantomPinned,
}

/// Future returned from [`Notify::notified()`]
/// Future returned from [`Notify::notified()`].
///
/// This future is fused, so once it has completed, any future calls to poll
/// will immediately return `Poll::Ready`.
#[derive(Debug)]
pub struct Notified<'a> {
/// The `Notify` being received on.
Expand Down Expand Up @@ -249,7 +336,16 @@ impl Notify {
/// immediately, consuming that permit. Otherwise, `notified().await` waits
/// for a permit to be made available by the next call to `notify_one()`.
///
/// The `Notified` future is not guaranteed to receive wakeups from calls to
/// `notify_one()` if it has not yet been polled. See the documentation for
/// [`Notified::enable()`] for more details.
///
/// The `Notified` future is guaranteed to receive wakeups from
/// `notify_waiters()` as soon as it has been created, even if it has not
/// yet been polled.
///
/// [`notify_one()`]: Notify::notify_one
/// [`Notified::enable()`]: Notified::enable
///
/// # Cancel safety
///
Expand Down Expand Up @@ -513,6 +609,114 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
// ===== impl Notified =====

impl Notified<'_> {
/// Adds this future to the list of futures that are ready to receive
/// wakeups from calls to [`notify_one`].
///
/// Polling the future also adds it to the list, so this method should only
/// be used if you want to add the future to the list before the first call
/// to `poll`. (In fact, this method is equivalent to calling `poll` except
/// that no `Waker` is registered.)
///
/// This has no effect on notifications sent using [`notify_waiters`], which
/// are received as long as they happen after the creation of the `Notified`
/// regardless of whether `enable` or `poll` has been called.
///
/// This method returns true if the `Notified` is ready. This happens in the
/// following situations:
///
/// 1. The `notify_waiters` method was called between the creation of the
/// `Notified` and the call to this method.
/// 2. This is the first call to `enable` or `poll` on this future, and the
/// `Notify` was holding a permit from a previous call to `notify_one`.
/// The call consumes the permit in that case.
/// 3. The future has previously been enabled or polled, and it has since
/// then been marked ready by either consuming a permit from the
/// `Notify`, or by a call to `notify_one` or `notify_waiters` that
/// removed it from the list of futures ready to receive wakeups.
///
/// If this method returns true, any future calls to poll on the same future
/// will immediately return `Poll::Ready`.
///
/// # Examples
///
/// Unbound multi-producer multi-consumer (mpmc) channel.
///
/// The call to `enable` is important because otherwise if you have two
/// calls to `recv` and two calls to `send` in parallel, the following could
/// happen:
///
/// 1. Both calls to `try_recv` return `None`.
/// 2. Both new elements are added to the vector.
/// 3. The `notify_one` method is called twice, adding only a single
/// permit to the `Notify`.
/// 4. Both calls to `recv` reach the `Notified` future. One of them
/// consumes the permit, and the other sleeps forever.
///
/// By adding the `Notified` futures to the list by calling `enable` before
/// `try_recv`, the `notify_one` calls in step three would remove the
/// futures from the list and mark them notified instead of adding a permit
/// to the `Notify`. This ensures that both futures are woken.
///
/// ```
/// use tokio::sync::Notify;
///
/// use std::collections::VecDeque;
/// use std::sync::Mutex;
///
/// struct Channel<T> {
/// messages: Mutex<VecDeque<T>>,
/// notify_on_sent: Notify,
/// }
///
/// impl<T> Channel<T> {
/// pub fn send(&self, msg: T) {
/// let mut locked_queue = self.messages.lock().unwrap();
/// locked_queue.push_back(msg);
/// drop(locked_queue);
///
/// // Send a notification to one of the calls currently
/// // waiting in a call to `recv`.
/// self.notify_on_sent.notify_one();
/// }
///
/// pub fn try_recv(&self) -> Option<T> {
/// let mut locked_queue = self.messages.lock().unwrap();
/// locked_queue.pop_front()
/// }
///
/// pub async fn recv(&self) -> T {
/// let future = self.notify_on_sent.notified();
/// tokio::pin!(future);
///
/// loop {
/// // Make sure that no wakeup is lost if we get
/// // `None` from `try_recv`.
/// future.as_mut().enable();
///
/// if let Some(msg) = self.try_recv() {
/// return msg;
/// }
///
/// // Wait for a call to `notify_one`.
/// //
/// // This uses `.as_mut()` to avoid consuming the future,
/// // which lets us call `Pin::set` below.
/// future.as_mut().await;
///
/// // Reset the future in case another call to
/// // `try_recv` got the message before us.
/// future.set(self.notify_on_sent.notified());
/// }
/// }
/// }
/// ```
///
/// [`notify_one`]: Notify::notify_one()
/// [`notify_waiters`]: Notify::notify_waiters()
pub fn enable(self: Pin<&mut Self>) -> bool {
self.poll_notified(None).is_ready()
}

/// A custom `project` implementation is used in place of `pin-project-lite`
/// as a custom drop implementation is needed.
fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>) {
Expand All @@ -526,12 +730,8 @@ impl Notified<'_> {
(me.notify, &mut me.state, &me.waiter)
}
}
}

impl Future for Notified<'_> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
use State::*;

let (notify, state, waiter) = self.project();
Expand All @@ -557,7 +757,7 @@ impl Future for Notified<'_> {

// Clone the waker before locking, a waker clone can be
// triggering arbitrary code.
let waker = cx.waker().clone();
let waker = waker.cloned();

// Acquire the lock and attempt to transition to the waiting
// state.
Expand Down Expand Up @@ -618,9 +818,11 @@ impl Future for Notified<'_> {
}
}

// Safety: called while locked.
unsafe {
(*waiter.get()).waker = Some(waker);
if waker.is_some() {
// Safety: called while locked.
unsafe {
(*waiter.get()).waker = waker;
}
}

// Insert the waiter into the linked list
Expand Down Expand Up @@ -652,8 +854,10 @@ impl Future for Notified<'_> {
*state = Done;
} else {
// Update the waker, if necessary.
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
w.waker = Some(cx.waker().clone());
if let Some(waker) = waker {
if !w.waker.as_ref().unwrap().will_wake(waker) {
w.waker = Some(waker.clone());
}
}

return Poll::Pending;
Expand All @@ -674,6 +878,14 @@ impl Future for Notified<'_> {
}
}

impl Future for Notified<'_> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.poll_notified(Some(cx.waker()))
}
}

impl Drop for Notified<'_> {
fn drop(&mut self) {
use State::*;
Expand Down

0 comments on commit 4675087

Please sign in to comment.