Skip to content

Commit

Permalink
mpsc: Add Sender::try_reserve function (#3418)
Browse files Browse the repository at this point in the history
* mpsc: Add `Sender::try_reserve` function

* Update tokio/src/sync/mpsc/bounded.rs

Co-authored-by: Alice Ryhl <alice@ryhl.io>

* Fix doc links

Co-authored-by: Alice Ryhl <alice@ryhl.io>
  • Loading branch information
LucioFranco and Darksonn committed Jan 13, 2021
1 parent 766a89b commit 672be92
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 0 deletions.
52 changes: 52 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Expand Up @@ -603,6 +603,58 @@ impl<T> Sender<T> {

Ok(Permit { chan: &self.chan })
}

/// Try to acquire a slot in the channel without waiting for the slot to become
/// available.
///
/// If the channel is full this function will return [`TrySendError`], otherwise
/// if there is a slot available it will return a [`Permit`] that will then allow you
/// to [`send`] on the channel with a guaranteed slot. This function is similar to
/// [`reserve`] execpt it does not await for the slot to become available.
///
/// Dropping [`Permit`] without sending a message releases the capacity back
/// to the channel.
///
/// [`Permit`]: Permit
/// [`send`]: Permit::send
/// [`reserve`]: Sender::reserve
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(1);
///
/// // Reserve capacity
/// let permit = tx.try_reserve().unwrap();
///
/// // Trying to send directly on the `tx` will fail due to no
/// // available capacity.
/// assert!(tx.try_send(123).is_err());
///
/// // Trying to reserve an additional slot on the `tx` will
/// // fail because there is no capacity.
/// assert!(tx.try_reserve().is_err());
///
/// // Sending on the permit succeeds
/// permit.send(456);
///
/// // The value sent on the permit is received
/// assert_eq!(rx.recv().await.unwrap(), 456);
///
/// }
/// ```
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
match self.chan.semaphore().0.try_acquire(1) {
Ok(_) => {}
Err(_) => return Err(TrySendError::Full(())),
}

Ok(Permit { chan: &self.chan })
}
}

impl<T> Clone for Sender<T> {
Expand Down
23 changes: 23 additions & 0 deletions tokio/tests/sync_mpsc.rs
Expand Up @@ -327,6 +327,29 @@ async fn try_send_fail() {
assert!(rx.recv().await.is_none());
}

#[tokio::test]
async fn try_reserve_fails() {
let (tx, mut rx) = mpsc::channel(1);

let permit = tx.try_reserve().unwrap();

// This should fail
match assert_err!(tx.try_reserve()) {
TrySendError::Full(()) => {}
_ => panic!(),
}

permit.send("foo");

assert_eq!(rx.recv().await, Some("foo"));

// Dropping permit releases the slot.
let permit = tx.try_reserve().unwrap();
drop(permit);

let _permit = tx.try_reserve().unwrap();
}

#[tokio::test]
async fn drop_permit_releases_permit() {
// poll_ready reserves capacity, ensure that the capacity is released if tx
Expand Down

0 comments on commit 672be92

Please sign in to comment.