From 8b021eabcf4311399a10bb102f4cb8b799d171eb Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Thu, 19 May 2022 13:05:21 +0200 Subject: [PATCH 1/2] sync: add `Notified::enable` --- tokio/src/sync/notify.rs | 252 +++++++++++++++++++++++++++++++++++---- 1 file changed, 231 insertions(+), 21 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 83d0de4fbe6..4b2630c96e6 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -26,22 +26,23 @@ type WaitList = LinkedList::Target>; /// `Notify` itself does not carry any data. Instead, it is to be used to signal /// another task to perform an operation. /// -/// `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. -/// [`notified().await`] waits for a permit to become available, and [`notify_one()`] -/// sets a permit **if there currently are no available permits**. +/// A `Notify` can be thought of as a [`Semaphore`] starting with 0 permits. The +/// [`notified().await`] method waits for a permit to become available, and +/// [`notify_one()`] sets a permit **if there currently are no available +/// permits**. /// /// The synchronization details of `Notify` are similar to /// [`thread::park`][park] and [`Thread::unpark`][unpark] from std. A [`Notify`] /// value contains a single permit. [`notified().await`] waits for the permit to -/// be made available, consumes the permit, and resumes. [`notify_one()`] sets the -/// permit, waking a pending task if there is one. +/// be made available, consumes the permit, and resumes. [`notify_one()`] sets +/// the permit, waking a pending task if there is one. /// -/// If `notify_one()` is called **before** `notified().await`, then the next call to -/// `notified().await` will complete immediately, consuming the permit. Any -/// subsequent calls to `notified().await` will wait for a new permit. +/// If `notify_one()` is called **before** `notified().await`, then the next +/// call to `notified().await` will complete immediately, consuming the permit. +/// Any subsequent calls to `notified().await` will wait for a new permit. /// -/// If `notify_one()` is called **multiple** times before `notified().await`, only a -/// **single** permit is stored. The next call to `notified().await` will +/// If `notify_one()` is called **multiple** times before `notified().await`, +/// only a **single** permit is stored. The next call to `notified().await` will /// complete immediately, but the one after will wait for a new permit. /// /// # Examples @@ -70,7 +71,11 @@ type WaitList = LinkedList::Target>; /// } /// ``` /// -/// Unbound mpsc channel. +/// Unbound multi-producer single-consumer (mpsc) channel. +/// +/// No wakeups can be lost when using this channel because the call to +/// `notify_one()` will store a permit in the `Notify`, which the following call +/// to `notified()` will consume. /// /// ``` /// use tokio::sync::Notify; @@ -92,6 +97,8 @@ type WaitList = LinkedList::Target>; /// self.notify.notify_one(); /// } /// +/// // This is a single-consumer channel, so several concurrent calls to +/// // `recv` are not allowed. /// pub async fn recv(&self) -> T { /// loop { /// // Drain values @@ -106,10 +113,87 @@ type WaitList = LinkedList::Target>; /// } /// ``` /// +/// Unbound multi-producer multi-consumer (mpmc) channel. +/// +/// The call to [`enable`] is important because otherwise if you have two +/// calls to `recv` and two calls to `send` in parallel, the following could +/// happen: +/// +/// 1. Both calls to `try_recv` return `None`. +/// 2. Both new elements are added to the vector. +/// 3. The `notify_one` method is called twice, adding only a single +/// permit to the `Notify`. +/// 4. Both calls to `recv` reach the `Notified` future. One of them +/// consumes the permit, and the other sleeps forever. +/// +/// By adding the `Notified` futures to the list by calling `enable` before +/// `try_recv`, the `notify_one` calls in step three would remove the +/// futures from the list and mark them notified instead of adding a permit +/// to the `Notify`. This ensures that both futures are woken. +/// +/// Notice that this failure can only happen if there are two concurrent calls +/// to `recv`. This is why the mpsc example above does not require a call to +/// `enable`. +/// +/// ``` +/// use tokio::sync::Notify; +/// +/// use std::collections::VecDeque; +/// use std::sync::Mutex; +/// +/// struct Channel { +/// messages: Mutex>, +/// notify_on_sent: Notify, +/// } +/// +/// impl Channel { +/// pub fn send(&self, msg: T) { +/// let mut locked_queue = self.messages.lock().unwrap(); +/// locked_queue.push_back(msg); +/// drop(locked_queue); +/// +/// // Send a notification to one of the calls currently +/// // waiting in a call to `recv`. +/// self.notify_on_sent.notify_one(); +/// } +/// +/// pub fn try_recv(&self) -> Option { +/// let mut locked_queue = self.messages.lock().unwrap(); +/// locked_queue.pop_front() +/// } +/// +/// pub async fn recv(&self) -> T { +/// let future = self.notify_on_sent.notified(); +/// tokio::pin!(future); +/// +/// loop { +/// // Make sure that no wakeup is lost if we get +/// // `None` from `try_recv`. +/// future.as_mut().enable(); +/// +/// if let Some(msg) = self.try_recv() { +/// return msg; +/// } +/// +/// // Wait for a call to `notify_one`. +/// // +/// // This uses `.as_mut()` to avoid consuming the future, +/// // which lets us call `Pin::set` below. +/// future.as_mut().await; +/// +/// // Reset the future in case another call to +/// // `try_recv` got the message before us. +/// future.set(self.notify_on_sent.notified()); +/// } +/// } +/// } +/// ``` +/// /// [park]: std::thread::park /// [unpark]: std::thread::Thread::unpark /// [`notified().await`]: Notify::notified() /// [`notify_one()`]: Notify::notify_one() +/// [`enable`]: Notified::enable() /// [`Semaphore`]: crate::sync::Semaphore #[derive(Debug)] pub struct Notify { @@ -145,7 +229,10 @@ struct Waiter { _p: PhantomPinned, } -/// Future returned from [`Notify::notified()`] +/// Future returned from [`Notify::notified()`]. +/// +/// This future is fused, so once it has completed, any future calls to poll +/// will immediately return `Poll::Ready`. #[derive(Debug)] pub struct Notified<'a> { /// The `Notify` being received on. @@ -249,7 +336,16 @@ impl Notify { /// immediately, consuming that permit. Otherwise, `notified().await` waits /// for a permit to be made available by the next call to `notify_one()`. /// + /// The `Notified` future is not guaranteed to receive wakeups from calls to + /// `notify_one()` if it has not yet been polled. See the documentation for + /// [`Notified::enable()`] for more details. + /// + /// The `Notified` future is guaranteed to receive wakeups from + /// `notify_waiters()` as soon as it has been created, even if it has not + /// yet been polled. + /// /// [`notify_one()`]: Notify::notify_one + /// [`Notified::enable()`]: Notified::enable /// /// # Cancel safety /// @@ -513,6 +609,114 @@ fn notify_locked(waiters: &mut WaitList, state: &AtomicUsize, curr: usize) -> Op // ===== impl Notified ===== impl Notified<'_> { + /// Adds this future to the list of futures that are ready to receive + /// wakeups from calls to [`notify_one`]. + /// + /// Polling the future also adds it to the list, so this method should only + /// be used if you want to add the future to the list before the first call + /// to `poll`. (In fact, this method is equivalent to calling `poll` except + /// that no `Waker` is registered.) + /// + /// This has no effect on notifications sent using [`notify_waiters`], which + /// are received as long as they happen after the creation of the `Notified` + /// regardless of whether `enable` or `poll` has been called. + /// + /// This method returns true if the `Notified` is ready. This happens in the + /// following situations: + /// + /// 1. The `notify_waiters` method was called between the creation of the + /// `Notified` and the call to this method. + /// 2. This is the first call to `enable` or `poll` on this future, and the + /// `Notify` was holding a permit from a previous call to `notify_one`. + /// The call consumes the permit in that case. + /// 3. The future has previously been enabled or polled, and it has since + /// then been marked ready by either consuming a permit from the + /// `Notify`, or by a call to `notify_one` or `notify_waiters` that + /// removed it from the list of futures ready to receive wakeups. + /// + /// If this method returns true, any future calls to poll on the same future + /// will immediately return `Poll::Ready`. + /// + /// # Examples + /// + /// Unbound multi-producer multi-consumer (mpmc) channel. + /// + /// The call to `enable` is important because otherwise if you have two + /// calls to `recv` and two calls to `send` in parallel, the following could + /// happen: + /// + /// 1. Both calls to `try_recv` return `None`. + /// 2. Both new elements are added to the vector. + /// 3. The `notify_one` method is called twice, adding only a single + /// permit to the `Notify`. + /// 4. Both calls to `recv` reach the `Notified` future. One of them + /// consumes the permit, and the other sleeps forever. + /// + /// By adding the `Notified` futures to the list by calling `enable` before + /// `try_recv`, the `notify_one` calls in step three would remove the + /// futures from the list and mark them notified instead of adding a permit + /// to the `Notify`. This ensures that both futures are woken. + /// + /// ``` + /// use tokio::sync::Notify; + /// + /// use std::collections::VecDeque; + /// use std::sync::Mutex; + /// + /// struct Channel { + /// messages: Mutex>, + /// notify_on_sent: Notify, + /// } + /// + /// impl Channel { + /// pub fn send(&self, msg: T) { + /// let mut locked_queue = self.messages.lock().unwrap(); + /// locked_queue.push_back(msg); + /// drop(locked_queue); + /// + /// // Send a notification to one of the calls currently + /// // waiting in a call to `recv`. + /// self.notify_on_sent.notify_one(); + /// } + /// + /// pub fn try_recv(&self) -> Option { + /// let mut locked_queue = self.messages.lock().unwrap(); + /// locked_queue.pop_front() + /// } + /// + /// pub async fn recv(&self) -> T { + /// let future = self.notify_on_sent.notified(); + /// tokio::pin!(future); + /// + /// loop { + /// // Make sure that no wakeup is lost if we get + /// // `None` from `try_recv`. + /// future.as_mut().enable(); + /// + /// if let Some(msg) = self.try_recv() { + /// return msg; + /// } + /// + /// // Wait for a call to `notify_one`. + /// // + /// // This uses `.as_mut()` to avoid consuming the future, + /// // which lets us call `Pin::set` below. + /// future.as_mut().await; + /// + /// // Reset the future in case another call to + /// // `try_recv` got the message before us. + /// future.set(self.notify_on_sent.notified()); + /// } + /// } + /// } + /// ``` + /// + /// [`notify_one`]: Notify::notify_one() + /// [`notify_waiters`]: Notify::notify_waiters() + pub fn enable(self: Pin<&mut Self>) -> bool { + self.poll_notified(None).is_ready() + } + /// A custom `project` implementation is used in place of `pin-project-lite` /// as a custom drop implementation is needed. fn project(self: Pin<&mut Self>) -> (&Notify, &mut State, &UnsafeCell) { @@ -526,12 +730,8 @@ impl Notified<'_> { (me.notify, &mut me.state, &me.waiter) } } -} -impl Future for Notified<'_> { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + fn poll_notified(self: Pin<&mut Self>, waker: Option<&Waker>) -> Poll<()> { use State::*; let (notify, state, waiter) = self.project(); @@ -557,7 +757,7 @@ impl Future for Notified<'_> { // Clone the waker before locking, a waker clone can be // triggering arbitrary code. - let waker = cx.waker().clone(); + let waker = waker.cloned(); // Acquire the lock and attempt to transition to the waiting // state. @@ -620,7 +820,7 @@ impl Future for Notified<'_> { // Safety: called while locked. unsafe { - (*waiter.get()).waker = Some(waker); + (*waiter.get()).waker = waker; } // Insert the waiter into the linked list @@ -652,8 +852,10 @@ impl Future for Notified<'_> { *state = Done; } else { // Update the waker, if necessary. - if !w.waker.as_ref().unwrap().will_wake(cx.waker()) { - w.waker = Some(cx.waker().clone()); + if let Some(waker) = waker { + if !w.waker.as_ref().unwrap().will_wake(waker) { + w.waker = Some(waker.clone()); + } } return Poll::Pending; @@ -674,6 +876,14 @@ impl Future for Notified<'_> { } } +impl Future for Notified<'_> { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + self.poll_notified(Some(cx.waker())) + } +} + impl Drop for Notified<'_> { fn drop(&mut self) { use State::*; From 0e426b96c9c7696bf885849e15c3ad18c0fac616 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Thu, 19 May 2022 16:37:29 +0200 Subject: [PATCH 2/2] Don't update if None --- tokio/src/sync/notify.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/notify.rs b/tokio/src/sync/notify.rs index 4b2630c96e6..7cc2ca39e27 100644 --- a/tokio/src/sync/notify.rs +++ b/tokio/src/sync/notify.rs @@ -818,9 +818,11 @@ impl Notified<'_> { } } - // Safety: called while locked. - unsafe { - (*waiter.get()).waker = waker; + if waker.is_some() { + // Safety: called while locked. + unsafe { + (*waiter.get()).waker = waker; + } } // Insert the waiter into the linked list