Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: add mpsc::Sender::{reserve_owned, try_reserve_owned} #3704

Merged
merged 4 commits into from Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
283 changes: 279 additions & 4 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -33,6 +33,22 @@ pub struct Permit<'a, T> {
chan: &'a chan::Tx<T, Semaphore>,
}

/// Owned permit to send one value into the channel.
///
/// This is identical to the [`Permit`] type, except that it moves the sender
/// rather than borrowing it.
///
/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
/// before generating a message to send.
///
/// [`Permit`]: Permit
/// [`Sender::reserve_owned()`]: Sender::reserve_owned
/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
pub struct OwnedPermit<T> {
chan: Option<chan::Tx<T, Semaphore>>,
}

/// Receive values from the associated `Sender`.
///
/// Instances are created by the [`channel`](channel) function.
Expand Down Expand Up @@ -624,12 +640,92 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
self.reserve_inner().await?;
Ok(Permit { chan: &self.chan })
}

/// Wait for channel capacity, moving the `Sender` and returning an owned
/// permit. Once capacity to send one message is available, it is reserved
/// for the caller.
///
/// This moves the sender _by value_, and returns an owned permit that can
/// be used to send a message into the channel. Unlike [`Sender::reserve`],
/// this method may be used in cases where the permit must be valid for the
/// `'static` lifetime.
hawkw marked this conversation as resolved.
Show resolved Hide resolved
///
/// If the channel is full, the function waits for the number of unreceived
/// messages to become less than the channel capacity. Capacity to send one
/// message is reserved for the caller. An [`OwnedPermit`] is returned to
/// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
/// consumes the reserved capacity.
///
/// Dropping the [`OwnedPermit`] without sending a message releases the
/// capacity back to the channel.
///
/// # Examples
/// Sending a message using an [`OwnedPermit`]:
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(1);
///
/// // Reserve capacity, moving the sender.
/// let permit = tx.reserve_owned().await.unwrap();
///
/// // Send a message, consuming the permit and returning
/// // the moved sender.
/// let tx = permit.send(123);
///
/// // The value sent on the permit is received.
/// assert_eq!(rx.recv().await.unwrap(), 123);
///
/// // The sender can now be used again.
/// tx.send(456).await.unwrap();
/// }
/// ```
///
/// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
/// by value, it can be inexpensively cloned before calling `reserve_owned`:
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(1);
///
/// // Clone the sender and reserve capacity.
/// let permit = tx.clone().reserve_owned().await.unwrap();
///
/// // Trying to send directly on the `tx` will fail due to no
/// // available capacity.
/// assert!(tx.try_send(123).is_err());
///
/// // Sending on the permit succeeds.
/// permit.send(456);
///
/// // The value sent on the permit is received
/// assert_eq!(rx.recv().await.unwrap(), 456);
/// }
/// ```
///
/// [`Sender::reserve`]: Sender::reserve
/// [`OwnedPermit`]: OwnedPermit
/// [`send`]: OwnedPermit::send
pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
self.reserve_inner().await?;
Ok(OwnedPermit {
chan: Some(self.chan),
})
}

async fn reserve_inner(&self) -> Result<(), SendError<()>> {
match self.chan.semaphore().0.acquire(1).await {
Ok(_) => {}
Err(_) => return Err(SendError(())),
Ok(_) => Ok(()),
Err(_) => Err(SendError(())),
}

Ok(Permit { chan: &self.chan })
}

/// Try to acquire a slot in the channel without waiting for the slot to become
Expand Down Expand Up @@ -684,6 +780,68 @@ impl<T> Sender<T> {
Ok(Permit { chan: &self.chan })
}

/// Try to acquire a slot in the channel without waiting for the slot to become
/// available, returning an owned permit.
///
/// This moves the sender _by value_, and returns an owned permit that can
/// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
/// this method may be used in cases where the permit must be valid for the
/// `'static` lifetime.
hawkw marked this conversation as resolved.
Show resolved Hide resolved
///
/// If the channel is full this function will return a [`TrySendError`].
/// Since the sender is taken by value, the `TrySendError` returned in this
/// case contains the sender, so that it may be used again. Otherwise, if
/// there is a slot available, this method will return an [`OwnedPermit`]
/// that can then be used to [`send`] on the channel with a guaranteed slot.
/// This function is similar to [`reserve_owned`] except it does not await
/// for the slot to become available.
///
/// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
/// to the channel.
///
/// [`OwnedPermit`]: OwnedPermit
/// [`send`]: OwnedPermit::send
/// [`reserve_owned`]: Sender::reserve_owned
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(1);
///
/// // Reserve capacity
/// let permit = tx.clone().try_reserve_owned().unwrap();
///
/// // Trying to send directly on the `tx` will fail due to no
/// // available capacity.
/// assert!(tx.try_send(123).is_err());
///
/// // Trying to reserve an additional slot on the `tx` will
/// // fail because there is no capacity.
/// assert!(tx.try_reserve().is_err());
///
/// // Sending on the permit succeeds
/// permit.send(456);
///
/// // The value sent on the permit is received
/// assert_eq!(rx.recv().await.unwrap(), 456);
///
/// }
/// ```
pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
match self.chan.semaphore().0.try_acquire(1) {
Ok(_) => {}
Err(_) => return Err(TrySendError::Full(self)),
}

Ok(OwnedPermit {
chan: Some(self.chan),
})
}

/// Returns `true` if senders belong to the same channel.
///
/// # Examples
Expand Down Expand Up @@ -817,3 +975,120 @@ impl<T> fmt::Debug for Permit<'_, T> {
.finish()
}
}

// ===== impl Permit =====

impl<T> OwnedPermit<T> {
/// Sends a value using the reserved capacity.
///
/// Capacity for the message has already been reserved. The message is sent
/// to the receiver and the permit is consumed. The operation will succeed
/// even if the receiver half has been closed. See [`Receiver::close`] for
/// more details on performing a clean shutdown.
hawkw marked this conversation as resolved.
Show resolved Hide resolved
///
/// Unlike [`Permit::send`], this method returns the [`Sender`] from which
/// the `OwnedPermit` was reserved.
///
/// [`Receiver::close`]: Receiver::close
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(1);
///
/// // Reserve capacity
/// let permit = tx.reserve_owned().await.unwrap();
///
/// // Send a message on the permit, returning the sender.
/// let tx = permit.send(456);
///
/// // The value sent on the permit is received
/// assert_eq!(rx.recv().await.unwrap(), 456);
///
/// // We may now reuse `tx` to send another message.
/// tx.send(789).await.unwrap();
/// }
/// ```
pub fn send(mut self, value: T) -> Sender<T> {
let chan = self.chan.take().unwrap_or_else(|| {
unreachable!("OwnedPermit channel is only taken when the permit is moved")
});
hawkw marked this conversation as resolved.
Show resolved Hide resolved
chan.send(value);

Sender { chan }
}

/// Release the reserved capacity *without* sending a message, returning the
/// [`Sender`].
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, rx) = mpsc::channel(1);
///
/// // Clone the sender and reserve capacity
/// let permit = tx.clone().reserve_owned().await.unwrap();
///
/// // Trying to send on the original `tx` will fail, since the `permit`
/// // has reserved all the available capacity.
/// assert!(tx.try_send(123).is_err());
///
/// // Release the permit without sending a message, returning the clone
/// // of the sender.
/// let tx2 = permit.release();
///
/// // We may now reuse `tx` to send another message.
/// tx.send(789).await.unwrap();
/// # drop(rx); drop(tx2);
/// }
/// ```
///
/// [`Sender`]: Sender
pub fn release(mut self) -> Sender<T> {
use chan::Semaphore;

let chan = self.chan.take().unwrap_or_else(|| {
unreachable!("OwnedPermit channel is only taken when the permit is moved")
});

// Add the permit back to the semaphore
chan.semaphore().add_permit();
Sender { chan }
}
}

impl<T> Drop for OwnedPermit<T> {
fn drop(&mut self) {
use chan::Semaphore;

// Are we still holding onto the sender?
if let Some(chan) = self.chan.take() {
let semaphore = chan.semaphore();

// Add the permit back to the semaphore
semaphore.add_permit();

if semaphore.is_closed() && semaphore.is_idle() {
chan.wake_rx();
}
hawkw marked this conversation as resolved.
Show resolved Hide resolved
}

// Otherwise, do nothing.
}
}

impl<T> fmt::Debug for OwnedPermit<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("OwnedPermit")
.field("chan", &self.chan)
.finish()
}
}
2 changes: 1 addition & 1 deletion tokio/src/sync/mpsc/mod.rs
Expand Up @@ -73,7 +73,7 @@
pub(super) mod block;

mod bounded;
pub use self::bounded::{channel, Permit, Receiver, Sender};
pub use self::bounded::{channel, OwnedPermit, Permit, Receiver, Sender};

mod chan;

Expand Down