Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add Notified::enable #4705

Merged
merged 3 commits into from May 19, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
258 changes: 235 additions & 23 deletions tokio/src/sync/notify.rs
Expand Up @@ -26,22 +26,23 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// `Notify` itself does not carry any data. Instead, it is to be used to signal
/// another task to perform an operation.
///
/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits.
/// [`notified().await`] waits for a permit to become available, and [`notify_one()`]
/// sets a permit **if there currently are no available permits**.
/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The
/// [`notified().await`] method waits for a permit to become available, and
/// [`notify_one()`] sets a permit **if there currently are no available
/// permits**.
///
/// The synchronization details of `Notify` are similar to
/// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`]
/// value contains a single permit. [`notified().await`] waits for the permit to
/// be made available, consumes the permit, and resumes. [`notify_one()`] sets the
/// permit, waking a pending task if there is one.
/// be made available, consumes the permit, and resumes. [`notify_one()`] sets
/// the permit, waking a pending task if there is one.
///
/// If `notify_one()` is called **before** `notified().await`, then the next call to
/// `notified().await` will complete immediately, consuming the permit. Any
/// subsequent calls to `notified().await` will wait for a new permit.
/// If `notify_one()` is called **before** `notified().await`, then the next
/// call to `notified().await` will complete immediately, consuming the permit.
/// Any subsequent calls to `notified().await` will wait for a new permit.
///
/// If `notify_one()` is called **multiple** times before `notified().await`, only a
/// **single** permit is stored. The next call to `notified().await` will
/// If `notify_one()` is called **multiple** times before `notified().await`,
/// only a **single** permit is stored. The next call to `notified().await` will
/// complete immediately, but the one after will wait for a new permit.
///
/// # Examples
Expand Down Expand Up @@ -70,7 +71,11 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// }
/// ```
///
/// Unbound mpsc channel.
/// Unbound multi-producer single-consumer (mpsc) channel.
///
/// No wakeups can be lost when using this channel because the call to
/// `notify_one()` will store a permit in the `Notify`, which the following call
/// to `notified()` will consume.
///
/// ```
/// use tokio::sync::Notify;
Expand All @@ -92,6 +97,8 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// self.notify.notify_one();
/// }
///
/// // This is a single-consumer channel, so several concurrent calls to
/// // `recv` are not allowed.
/// pub async fn recv(&self) -> T {
/// loop {
/// // Drain values
Expand All @@ -106,10 +113,87 @@ type WaitList = LinkedList<Waiter, <Waiter as linked_list::Link>::Target>;
/// }
/// ```
///
/// Unbound multi-producer multi-consumer (mpmc) channel.
///
/// The call to [`enable`] is important because otherwise if you have two
/// calls to `recv` and two calls to `send` in parallel, the following could
/// happen:
///
/// 1. Both calls to `try_recv` return `None`.
/// 2. Both new elements are added to the vector.
/// 3. The `notify_one` method is called twice, adding only a single
/// permit to the `Notify`.
/// 4. Both calls to `recv` reach the `Notified` future. One of them
/// consumes the permit, and the other sleeps forever.
///
/// By adding the `Notified` futures to the list by calling `enable` before
/// `try_recv`, the `notify_one` calls in step three would remove the
/// futures from the list and mark them notified instead of adding a permit
/// to the `Notify`. This ensures that both futures are woken.
///
/// Notice that this failure can only happen if there are two concurrent calls
/// to `recv`. This is why the mpsc example above does not require a call to
/// `enable`.
///
/// ```
/// use tokio::sync::Notify;
///
/// use std::collections::VecDeque;
/// use std::sync::Mutex;
///
/// struct Channel<T> {
/// messages: Mutex<VecDeque<T>>,
/// notify_on_sent: Notify,
/// }
///
/// impl<T> Channel<T> {
/// pub fn send(&self, msg: T) {
/// let mut locked_queue = self.messages.lock().unwrap();
/// locked_queue.push_back(msg);
/// drop(locked_queue);
///
/// // Send a notification to one of the calls currently
/// // waiting in a call to `recv`.
/// self.notify_on_sent.notify_one();
/// }
///
/// pub fn try_recv(&self) -> Option<T> {
/// let mut locked_queue = self.messages.lock().unwrap();
/// locked_queue.pop_front()
/// }
///
/// pub async fn recv(&self) -> T {
/// let future = self.notify_on_sent.notified();
/// tokio::pin!(future);
///
/// loop {
/// // Make sure that no wakeup is lost if we get
/// // `None` from `try_recv`.
/// future.as_mut().enable();
///
/// if let Some(msg) = self.try_recv() {
/// return msg;
/// }
///
/// // Wait for a call to `notify_one`.
/// //
/// // This uses `.as_mut()` to avoid consuming the future,
/// // which lets us call `Pin::set` below.
/// future.as_mut().await;
///
/// // Reset the future in case another call to
/// // `try_recv` got the message before us.
/// future.set(self.notify_on_sent.notified());
/// }
/// }
/// }
/// ```
///
/// [park]: std::thread::park
/// [unpark]: std::thread::Thread::unpark
/// [`notified().await`]: Notify::notified()
/// [`notify_one()`]: Notify::notify_one()
/// [`enable`]: Notified::enable()
/// [`Semaphore`]: crate::sync::Semaphore
#[derive(Debug)]
pub struct Notify {
Expand Down Expand Up @@ -145,7 +229,10 @@ struct Waiter {
_p: PhantomPinned,
}

/// Future returned from [`Notify::notified()`]
/// Future returned from [`Notify::notified()`].
///
/// This future is fused, so once it has completed, any future calls to poll
/// will immediately return `Poll::Ready`.
#[derive(Debug)]
pub struct Notified<'a> {
/// The `Notify` being received on.
Expand Down Expand Up @@ -249,7 +336,16 @@ impl Notify {
/// immediately, consuming that permit. Otherwise, `notified().await` waits
/// for a permit to be made available by the next call to `notify_one()`.
///
/// The `Notified` future is not guaranteed to receive wakeups from calls to
/// `notify_one()` if it has not yet been polled. See the documentation for
/// [`Notified::enable()`] for more details.
///
/// The `Notified` future is guaranteed to receive wakeups from
/// `notify_waiters()` as soon as it has been created, even if it has not
/// yet been polled.
///
/// [`notify_one()`]: Notify::notify_one
/// [`Notified::enable()`]: Notified::enable
///
/// # Cancel safety
///
Expand Down Expand Up @@ -513,6 +609,114 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op
// ===== impl Notified =====

impl Notified<'_> {
/// Adds this future to the list of futures that are ready to receive
/// wakeups from calls to [`notify_one`].
///
/// Polling the future also adds it to the list, so this method should only
/// be used if you want to add the future to the list before the first call
/// to `poll`. (In fact, this method is equivalent to calling `poll` except
/// that no `Waker` is registered.)
///
/// This has no effect on notifications sent using [`notify_waiters`], which
/// are received as long as they happen after the creation of the `Notified`
/// regardless of whether `enable` or `poll` has been called.
///
/// This method returns true if the `Notified` is ready. This happens in the
/// following situations:
///
/// 1. The `notify_waiters` method was called between the creation of the
/// `Notified` and the call to this method.
/// 2. This is the first call to `enable` or `poll` on this future, and the
/// `Notify` was holding a permit from a previous call to `notify_one`.
/// The call consumes the permit in that case.
/// 3. The future has previously been enabled or polled, and it has since
/// then been marked ready by either consuming a permit from the
/// `Notify`, or by a call to `notify_one` or `notify_waiters` that
/// removed it from the list of futures ready to receive wakeups.
///
/// If this method returns true, any future calls to poll on the same future
/// will immediately return `Poll::Ready`.
///
/// # Examples
///
/// Unbound multi-producer multi-consumer (mpmc) channel.
///
/// The call to `enable` is important because otherwise if you have two
/// calls to `recv` and two calls to `send` in parallel, the following could
/// happen:
///
/// 1. Both calls to `try_recv` return `None`.
/// 2. Both new elements are added to the vector.
/// 3. The `notify_one` method is called twice, adding only a single
/// permit to the `Notify`.
/// 4. Both calls to `recv` reach the `Notified` future. One of them
/// consumes the permit, and the other sleeps forever.
///
/// By adding the `Notified` futures to the list by calling `enable` before
/// `try_recv`, the `notify_one` calls in step three would remove the
/// futures from the list and mark them notified instead of adding a permit
/// to the `Notify`. This ensures that both futures are woken.
///
/// ```
/// use tokio::sync::Notify;
///
/// use std::collections::VecDeque;
/// use std::sync::Mutex;
///
/// struct Channel<T> {
/// messages: Mutex<VecDeque<T>>,
/// notify_on_sent: Notify,
/// }
///
/// impl<T> Channel<T> {
/// pub fn send(&self, msg: T) {
/// let mut locked_queue = self.messages.lock().unwrap();
/// locked_queue.push_back(msg);
/// drop(locked_queue);
///
/// // Send a notification to one of the calls currently
/// // waiting in a call to `recv`.
/// self.notify_on_sent.notify_one();
/// }
///
/// pub fn try_recv(&self) -> Option<T> {
/// let mut locked_queue = self.messages.lock().unwrap();
/// locked_queue.pop_front()
/// }
///
/// pub async fn recv(&self) -> T {
/// let future = self.notify_on_sent.notified();
/// tokio::pin!(future);
///
/// loop {
/// // Make sure that no wakeup is lost if we get
/// // `None` from `try_recv`.
/// future.as_mut().enable();
///
/// if let Some(msg) = self.try_recv() {
/// return msg;
/// }
///
/// // Wait for a call to `notify_one`.
/// //
/// // This uses `.as_mut()` to avoid consuming the future,
/// // which lets us call `Pin::set` below.
/// future.as_mut().await;
///
/// // Reset the future in case another call to
/// // `try_recv` got the message before us.
/// future.set(self.notify_on_sent.notified());
/// }
/// }
/// }
/// ```
///
/// [`notify_one`]: Notify::notify_one()
/// [`notify_waiters`]: Notify::notify_waiters()
pub fn enable(self: Pin<&mut Self>) -> bool {
self.poll_notified(None).is_ready()
}

/// A custom `project` implementation is used in place of `pin-project-lite`
/// as a custom drop implementation is needed.
fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell<Waiter>) {
Expand All @@ -526,12 +730,8 @@ impl Notified<'_> {
(me.notify, &mut me.state, &me.waiter)
}
}
}

impl Future for Notified<'_> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> {
use State::*;

let (notify, state, waiter) = self.project();
Expand All @@ -557,7 +757,7 @@ impl Future for Notified<'_> {

// Clone the waker before locking, a waker clone can be
// triggering arbitrary code.
let waker = cx.waker().clone();
let waker = waker.cloned();

// Acquire the lock and attempt to transition to the waiting
// state.
Expand Down Expand Up @@ -618,9 +818,11 @@ impl Future for Notified<'_> {
}
}

// Safety: called while locked.
unsafe {
(*waiter.get()).waker = Some(waker);
if waker.is_some() {
// Safety: called while locked.
unsafe {
(*waiter.get()).waker = waker;
}
}

// Insert the waiter into the linked list
Expand Down Expand Up @@ -652,8 +854,10 @@ impl Future for Notified<'_> {
*state = Done;
} else {
// Update the waker, if necessary.
if !w.waker.as_ref().unwrap().will_wake(cx.waker()) {
w.waker = Some(cx.waker().clone());
if let Some(waker) = waker {
if !w.waker.as_ref().unwrap().will_wake(waker) {
w.waker = Some(waker.clone());
}
}

return Poll::Pending;
Expand All @@ -674,6 +878,14 @@ impl Future for Notified<'_> {
}
}

impl Future for Notified<'_> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.poll_notified(Some(cx.waker()))
}
}

impl Drop for Notified<'_> {
fn drop(&mut self) {
use State::*;
Expand Down