diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 985167ec565..5b19f787d6e 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -603,6 +603,58 @@ impl Sender { Ok(Permit { chan: &self.chan }) } + + /// Try to acquire a slot in the channel without waiting for the slot to become + /// available. + /// + /// If the channel is full this function will return [`TrySendError`], otherwise + /// if there is a slot available it will return a [`Permit`] that will then allow you + /// to [`send`] on the channel with a guaranteed slot. This function is similar to + /// [`reserve`] execpt it does not await for the slot to become available. + /// + /// Dropping [`Permit`] without sending a message releases the capacity back + /// to the channel. + /// + /// [`Permit`]: Permit + /// [`send`]: Permit::send + /// [`reserve`]: Sender::reserve + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(1); + /// + /// // Reserve capacity + /// let permit = tx.try_reserve().unwrap(); + /// + /// // Trying to send directly on the `tx` will fail due to no + /// // available capacity. + /// assert!(tx.try_send(123).is_err()); + /// + /// // Trying to reserve an additional slot on the `tx` will + /// // fail because there is no capacity. + /// assert!(tx.try_reserve().is_err()); + /// + /// // Sending on the permit succeeds + /// permit.send(456); + /// + /// // The value sent on the permit is received + /// assert_eq!(rx.recv().await.unwrap(), 456); + /// + /// } + /// ``` + pub fn try_reserve(&self) -> Result, TrySendError<()>> { + match self.chan.semaphore().0.try_acquire(1) { + Ok(_) => {} + Err(_) => return Err(TrySendError::Full(())), + } + + Ok(Permit { chan: &self.chan }) + } } impl Clone for Sender { diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index b378e6bb953..cd43ad4381c 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -327,6 +327,29 @@ async fn try_send_fail() { assert!(rx.recv().await.is_none()); } +#[tokio::test] +async fn try_reserve_fails() { + let (tx, mut rx) = mpsc::channel(1); + + let permit = tx.try_reserve().unwrap(); + + // This should fail + match assert_err!(tx.try_reserve()) { + TrySendError::Full(()) => {} + _ => panic!(), + } + + permit.send("foo"); + + assert_eq!(rx.recv().await, Some("foo")); + + // Dropping permit releases the slot. + let permit = tx.try_reserve().unwrap(); + drop(permit); + + let _permit = tx.try_reserve().unwrap(); +} + #[tokio::test] async fn drop_permit_releases_permit() { // poll_ready reserves capacity, ensure that the capacity is released if tx