From 4ef07f48cac06d3953f2596b360d68518f9a5e07 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 14 Apr 2021 14:59:39 -0700 Subject: [PATCH 1/4] sync: add `mpsc::Sender::{reserve_owned, try_reserve_owned}` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Motivation The `mpsc::Sender::reserve` method currently returns a permit that borrows from the `Sender`. It would be nice to have a version of it that returns an owned permit. ## Solution This branch adds an `OwnedPermit` type and `Sender::{reserve_owned, try_reserve_owned}` methods. Unlike the comparable methods on `Semaphore`, these methods do *not* require an `Arc` as the receiver; this is because the sender internally reference counts the channel and is already cheap to clone. Requiring an `Arc` would simply add an unnecessary second layer of reference counting, which is not ideal; instead, the documentation encourages the user to clone the sender prior to calling `reserve_owned` when necessary. Since these methods take the `Sender` by value, they also have the ability to _return_ the `Sender` from a successful `OwnedPermit::send`. This allows them to be used without additional clones. Essentially, this is a very simple type-level encoding of the sender's state, with the transitions ``` ┌──────┐ ┌───►│Sender├───┐ │ └──────┘ │ send reserve │ ┌───────────┐ │ └─┤OwnedPermit│◄┘ └───────────┘ ``` Additionally, I added an `OwnedPermit::release`, which returns the `Sender` and releases the permit *without* sending a message. Closes #3688 --- tokio/src/sync/mpsc/bounded.rs | 283 ++++++++++++++++++++++++++++++++- 1 file changed, 279 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 1f670bfa743..ce744102774 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -33,6 +33,22 @@ pub struct Permit<'a, T> { chan: &'a chan::Tx, } +/// Owned permit to send one value into the channel. +/// +/// This is identical to the [`Permit`] type, except that it moves the sender +/// rather than borrowing it. +/// +/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and +/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity +/// before generating a message to send. +/// +/// [`Permit`]: Permit +/// [`Sender::reserve_owned()`]: Sender::reserve_owned +/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned +pub struct OwnedPermit { + chan: Option>, +} + /// Receive values from the associated `Sender`. /// /// Instances are created by the [`channel`](channel) function. @@ -624,12 +640,92 @@ impl Sender { /// } /// ``` pub async fn reserve(&self) -> Result, SendError<()>> { + self.reserve_inner().await?; + Ok(Permit { chan: &self.chan }) + } + + /// Wait for channel capacity, moving the `Sender` and returning an owned + /// permit. Once capacity to send one message is available, it is reserved + /// for the caller. + /// + /// This moves the sender _by value_, and returns an owned permit that can + /// be used to send a message into the channel. Unlike [`Sender::reserve`], + /// this method may be used in cases where the permit must be valid for the + /// `'static` lifetime. + /// + /// If the channel is full, the function waits for the number of unreceived + /// messages to become less than the channel capacity. Capacity to send one + /// message is reserved for the caller. An [`OwnedPermit`] is returned to + /// track the reserved capacity. The [`send`] function on [`OwnedPermit`] + /// consumes the reserved capacity. + /// + /// Dropping the [`OwnedPermit`] without sending a message releases the + /// capacity back to the channel. + /// + /// # Examples + /// Sending a message using an [`OwnedPermit`]: + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity, moving the sender. + /// let permit = tx.reserve_owned().await.unwrap(); + /// + /// // Send a message, consuming the permit and returning + /// // the moved sender. + /// let tx = permit.send(123); + /// + /// // The value sent on the permit is received. + /// assert_eq!(rx.recv().await.unwrap(), 123); + /// + /// // The sender can now be used again. + /// tx.send(456).await.unwrap(); + /// } + /// ``` + /// + /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved + /// by value, it can be inexpensively cloned before calling `reserve_owned`: + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Clone the sender and reserve capacity. + /// let permit = tx.clone().reserve_owned().await.unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Sending on the permit succeeds. + /// permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// } + /// ``` + /// + /// [`Sender::reserve`]: Sender::reserve + /// [`OwnedPermit`]: OwnedPermit + /// [`send`]: OwnedPermit::send + pub async fn reserve_owned(self) -> Result, SendError<()>> { + self.reserve_inner().await?; + Ok(OwnedPermit { + chan: Some(self.chan), + }) + } + + async fn reserve_inner(&self) -> Result<(), SendError<()>> { match self.chan.semaphore().0.acquire(1).await { - Ok(_) => {} - Err(_) => return Err(SendError(())), + Ok(_) => Ok(()), + Err(_) => Err(SendError(())), } - - Ok(Permit { chan: &self.chan }) } /// Try to acquire a slot in the channel without waiting for the slot to become @@ -684,6 +780,68 @@ impl Sender { Ok(Permit { chan: &self.chan }) } + /// Try to acquire a slot in the channel without waiting for the slot to become + /// available, returning an owned permit. + /// + /// This moves the sender _by value_, and returns an owned permit that can + /// be used to send a message into the channel. Unlike [`Sender::try_reserve`], + /// this method may be used in cases where the permit must be valid for the + /// `'static` lifetime. + /// + /// If the channel is full this function will return a [`TrySendError`]. + /// Since the sender is taken by value, the `TrySendError` returned in this + /// case contains the sender, so that it may be used again. Otherwise, if + /// there is a slot available, this method will return an [`OwnedPermit`] + /// that can then be used to [`send`] on the channel with a guaranteed slot. + /// This function is similar to [`reserve_owned`] except it does not await + /// for the slot to become available. + /// + /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back + /// to the channel. + /// + /// [`OwnedPermit`]: OwnedPermit + /// [`send`]: OwnedPermit::send + /// [`reserve_owned`]: Sender::reserve_owned + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity + /// let permit = tx.clone().try_reserve_owned().unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Trying to reserve an additional slot on the `tx` will + /// // fail because there is no capacity. + /// assert!(tx.try_reserve().is_err()); + /// + /// // Sending on the permit succeeds + /// permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// + /// } + /// ``` + pub fn try_reserve_owned(self) -> Result, TrySendError> { + match self.chan.semaphore().0.try_acquire(1) { + Ok(_) => {} + Err(_) => return Err(TrySendError::Full(self)), + } + + Ok(OwnedPermit { + chan: Some(self.chan), + }) + } + /// Returns `true` if senders belong to the same channel. /// /// # Examples @@ -817,3 +975,120 @@ impl fmt::Debug for Permit<'_, T> { .finish() } } + +// ===== impl Permit ===== + +impl OwnedPermit { + /// Sends a value using the reserved capacity. + /// + /// Capacity for the message has already been reserved. The message is sent + /// to the receiver and the permit is consumed. The operation will succeed + /// even if the receiver half has been closed. See [`Receiver::close`] for + /// more details on performing a clean shutdown. + /// + /// Unlike [`Permit::send`], this method returns the [`Sender`] from which + /// the `OwnedPermit` was reserved. + /// + /// [`Receiver::close`]: Receiver::close + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity + /// let permit = tx.reserve_owned().await.unwrap(); + /// + /// // Send a message on the permit, returning the sender. + /// let tx = permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// + /// // We may now reuse `tx` to send another message. + /// tx.send(789).await.unwrap(); + /// } + /// ``` + pub fn send(mut self, value: T) -> Sender { + let chan = self.chan.take().unwrap_or_else(|| { + unreachable!("OwnedPermit channel is only taken when the permit is moved") + }); + chan.send(value); + + Sender { chan } + } + + /// Release the reserved capacity *without* sending a message, returning the + /// [`Sender`]. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Clone the sender and reserve capacity + /// let permit = tx.clone().reserve_owned().await.unwrap(); + /// + /// // Trying to send on the original `tx` will fail, since the `permit` + /// // has reserved all the available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Release the permit without sending a message, returning the clone + /// // of the sender. + /// let tx2 = permit.release(); + /// + /// // We may now reuse `tx` to send another message. + /// tx.send(789).await.unwrap(); + /// # drop(rx); drop(tx2); + /// } + /// ``` + /// + /// [`Sender`]: Sender + pub fn release(mut self) -> Sender { + use chan::Semaphore; + + let chan = self.chan.take().unwrap_or_else(|| { + unreachable!("OwnedPermit channel is only taken when the permit is moved") + }); + + // Add the permit back to the semaphore + chan.semaphore().add_permit(); + Sender { chan } + } +} + +impl Drop for OwnedPermit { + fn drop(&mut self) { + use chan::Semaphore; + + // Are we still holding onto the sender? + if let Some(chan) = self.chan.take() { + let semaphore = chan.semaphore(); + + // Add the permit back to the semaphore + semaphore.add_permit(); + + if semaphore.is_closed() && semaphore.is_idle() { + chan.wake_rx(); + } + } + + // Otherwise, do nothing. + } +} + +impl fmt::Debug for OwnedPermit { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("OwnedPermit") + .field("chan", &self.chan) + .finish() + } +} From 197df25701d53344840d69702f1f2646bb728e29 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 14 Apr 2021 15:42:03 -0700 Subject: [PATCH 2/4] whoops i forgot the reexport Signed-off-by: Eliza Weisman --- tokio/src/sync/mpsc/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/mod.rs b/tokio/src/sync/mpsc/mod.rs index e7033f6f2ba..879e3dcfc01 100644 --- a/tokio/src/sync/mpsc/mod.rs +++ b/tokio/src/sync/mpsc/mod.rs @@ -73,7 +73,7 @@ pub(super) mod block; mod bounded; -pub use self::bounded::{channel, Permit, Receiver, Sender}; +pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender}; mod chan; From 9d143f09b496fa4ebc8fa43ab348849c22667961 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Thu, 15 Apr 2021 09:30:16 +0200 Subject: [PATCH 3/4] Fix CI --- tokio/src/sync/mpsc/bounded.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index ce744102774..bc94984ce53 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1032,7 +1032,7 @@ impl OwnedPermit { /// /// #[tokio::main] /// async fn main() { - /// let (tx, mut rx) = mpsc::channel(1); + /// let (tx, rx) = mpsc::channel(1); /// /// // Clone the sender and reserve capacity /// let permit = tx.clone().reserve_owned().await.unwrap(); From f650b38949c5eb9adce2685de1ad6ca2fcae76e9 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Thu, 15 Apr 2021 11:01:28 -0700 Subject: [PATCH 4/4] docs review feedback Signed-off-by: Eliza Weisman --- tokio/src/sync/mpsc/bounded.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index bc94984ce53..ce857d7413e 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -245,10 +245,11 @@ impl Receiver { /// /// To guarantee that no messages are dropped, after calling `close()`, /// `recv()` must be called until `None` is returned. If there are - /// outstanding [`Permit`] values, the `recv` method will not return `None` - /// until those are released. + /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will + /// not return `None` until those are released. /// /// [`Permit`]: Permit + /// [`OwnedPermit`]: OwnedPermit /// /// # Examples /// @@ -651,7 +652,10 @@ impl Sender { /// This moves the sender _by value_, and returns an owned permit that can /// be used to send a message into the channel. Unlike [`Sender::reserve`], /// this method may be used in cases where the permit must be valid for the - /// `'static` lifetime. + /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is + /// essentially a reference count increment, comparable to [`Arc::clone`]), + /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be + /// moved, it can be cloned prior to calling `reserve_owned`. /// /// If the channel is full, the function waits for the number of unreceived /// messages to become less than the channel capacity. Capacity to send one @@ -714,6 +718,7 @@ impl Sender { /// [`Sender::reserve`]: Sender::reserve /// [`OwnedPermit`]: OwnedPermit /// [`send`]: OwnedPermit::send + /// [`Arc::clone`]: std::sync::Arc::clone pub async fn reserve_owned(self) -> Result, SendError<()>> { self.reserve_inner().await?; Ok(OwnedPermit { @@ -786,7 +791,10 @@ impl Sender { /// This moves the sender _by value_, and returns an owned permit that can /// be used to send a message into the channel. Unlike [`Sender::try_reserve`], /// this method may be used in cases where the permit must be valid for the - /// `'static` lifetime. + /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is + /// essentially a reference count increment, comparable to [`Arc::clone`]), + /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be + /// moved, it can be cloned prior to calling `try_reserve_owned`. /// /// If the channel is full this function will return a [`TrySendError`]. /// Since the sender is taken by value, the `TrySendError` returned in this @@ -802,6 +810,7 @@ impl Sender { /// [`OwnedPermit`]: OwnedPermit /// [`send`]: OwnedPermit::send /// [`reserve_owned`]: Sender::reserve_owned + /// [`Arc::clone`]: std::sync::Arc::clone /// /// # Examples /// @@ -962,6 +971,8 @@ impl Drop for Permit<'_, T> { // Add the permit back to the semaphore semaphore.add_permit(); + // If this is the last sender for this channel, wake the receiver so + // that it can be notified that the channel is closed. if semaphore.is_closed() && semaphore.is_idle() { self.chan.wake_rx(); } @@ -1076,6 +1087,9 @@ impl Drop for OwnedPermit { // Add the permit back to the semaphore semaphore.add_permit(); + // If this `OwnedPermit` is holding the last sender for this + // channel, wake the receiver so that it can be notified that the + // channel is closed. if semaphore.is_closed() && semaphore.is_idle() { chan.wake_rx(); }