diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 8736971a147..803f2a18c6d 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -41,15 +41,28 @@ struct Waitlist { closed: bool, } -/// Error returned by `Semaphore::try_acquire`. -#[derive(Debug)] -pub(crate) enum TryAcquireError { +/// Error returned from the [`Semaphore::try_acquire`] function. +/// +/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire +#[derive(Debug, PartialEq)] +pub enum TryAcquireError { + /// The semaphore has been [closed] and cannot issue new permits. + /// + /// [closed]: crate::sync::Semaphore::close Closed, + + /// The semaphore has no available permits. NoPermits, } -/// Error returned by `Semaphore::acquire`. +/// Error returned from the [`Semaphore::acquire`] function. +/// +/// An `acquire` operation can only fail if the semaphore has been +/// [closed]. +/// +/// [closed]: crate::sync::Semaphore::close +/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire #[derive(Debug)] -pub(crate) struct AcquireError(()); +pub struct AcquireError(()); pub(crate) struct Acquire<'a> { node: Waiter, @@ -164,8 +177,6 @@ impl Semaphore { /// Closes the semaphore. This prevents the semaphore from issuing new /// permits and notifies all pending waiters. - // This will be used once the bounded MPSC is updated to use the new - // semaphore implementation. pub(crate) fn close(&self) { let mut waiters = self.waiters.lock(); // If the semaphore's permits counter has enough permits for an diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index d88bf9d65e0..a183fe6edd3 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -443,8 +443,10 @@ cfg_sync! { pub mod oneshot; pub(crate) mod batch_semaphore; + pub use batch_semaphore::{AcquireError, TryAcquireError}; + mod semaphore; - pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit, TryAcquireError}; + pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit}; mod rwlock; pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 2acccfa2fc1..5555bdf9324 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,4 +1,5 @@ use super::batch_semaphore as ll; // low level implementation +use super::{AcquireError, TryAcquireError}; use std::sync::Arc; /// Counting semaphore performing asynchronous permit acquisition. @@ -42,15 +43,6 @@ pub struct OwnedSemaphorePermit { permits: u32, } -/// Error returned from the [`Semaphore::try_acquire`] function. -/// -/// A `try_acquire` operation can only fail if the semaphore has no available -/// permits. -/// -/// [`Semaphore::try_acquire`]: Semaphore::try_acquire -#[derive(Debug)] -pub struct TryAcquireError(()); - #[test] #[cfg(not(loom))] fn bounds() { @@ -95,73 +87,148 @@ impl Semaphore { self.ll_sem.release(n); } - /// Acquires permit from the semaphore. - pub async fn acquire(&self) -> SemaphorePermit<'_> { - self.ll_sem.acquire(1).await.unwrap(); - SemaphorePermit { + /// Acquires a permit from the semaphore. + /// + /// If the semaphore has been closed, this returns an [`AcquireError`]. + /// Otherwise, this returns a [`SemaphorePermit`] representing the + /// acquired permit. + /// + /// [`AcquireError`]: crate::sync::AcquireError + /// [`SemaphorePermit`]: crate::sync::SemaphorePermit + pub async fn acquire(&self) -> Result, AcquireError> { + self.ll_sem.acquire(1).await?; + Ok(SemaphorePermit { sem: &self, permits: 1, - } + }) } - /// Acquires `n` permits from the semaphore - pub async fn acquire_many(&self, n: u32) -> SemaphorePermit<'_> { - self.ll_sem.acquire(n).await.unwrap(); - SemaphorePermit { + /// Acquires `n` permits from the semaphore. + /// + /// If the semaphore has been closed, this returns an [`AcquireError`]. + /// Otherwise, this returns a [`SemaphorePermit`] representing the + /// acquired permits. + /// + /// [`AcquireError`]: crate::sync::AcquireError + /// [`SemaphorePermit`]: crate::sync::SemaphorePermit + pub async fn acquire_many(&self, n: u32) -> Result, AcquireError> { + self.ll_sem.acquire(n).await?; + Ok(SemaphorePermit { sem: &self, permits: n, - } + }) } /// Tries to acquire a permit from the semaphore. + /// + /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`] + /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise, + /// this returns a [`SemaphorePermit`] representing the acquired permits. + /// + /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed + /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits + /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub fn try_acquire(&self) -> Result, TryAcquireError> { match self.ll_sem.try_acquire(1) { Ok(_) => Ok(SemaphorePermit { sem: self, permits: 1, }), - Err(_) => Err(TryAcquireError(())), + Err(e) => Err(e), } } - /// Tries to acquire `n` permits from the semaphore. + /// Tries to acquire n permits from the semaphore. + /// + /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`] + /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise, + /// this returns a [`SemaphorePermit`] representing the acquired permits. + /// + /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed + /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits + /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub fn try_acquire_many(&self, n: u32) -> Result, TryAcquireError> { match self.ll_sem.try_acquire(n) { Ok(_) => Ok(SemaphorePermit { sem: self, permits: n, }), - Err(_) => Err(TryAcquireError(())), + Err(e) => Err(e), } } - /// Acquires permit from the semaphore. + /// Acquires a permit from the semaphore. /// /// The semaphore must be wrapped in an [`Arc`] to call this method. + /// If the semaphore has been closed, this returns an [`AcquireError`]. + /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the + /// acquired permit. /// /// [`Arc`]: std::sync::Arc - pub async fn acquire_owned(self: Arc) -> OwnedSemaphorePermit { - self.ll_sem.acquire(1).await.unwrap(); - OwnedSemaphorePermit { + /// [`AcquireError`]: crate::sync::AcquireError + /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit + pub async fn acquire_owned(self: Arc) -> Result { + self.ll_sem.acquire(1).await?; + Ok(OwnedSemaphorePermit { sem: self, permits: 1, - } + }) } /// Tries to acquire a permit from the semaphore. /// - /// The semaphore must be wrapped in an [`Arc`] to call this method. + /// The semaphore must be wrapped in an [`Arc`] to call this method. If + /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`] + /// and a [`TryAcquireError::NoPermits`] if there are no permits left. + /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the + /// acquired permit. /// /// [`Arc`]: std::sync::Arc + /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed + /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits + /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit pub fn try_acquire_owned(self: Arc) -> Result { match self.ll_sem.try_acquire(1) { Ok(_) => Ok(OwnedSemaphorePermit { sem: self, permits: 1, }), - Err(_) => Err(TryAcquireError(())), + Err(e) => Err(e), } } + + /// Closes the semaphore. + /// + /// This prevents the semaphore from issuing new permits and notifies all pending waiters. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Semaphore; + /// use std::sync::Arc; + /// use tokio::sync::TryAcquireError; + /// + /// #[tokio::main] + /// async fn main() { + /// let semaphore = Arc::new(Semaphore::new(1)); + /// let semaphore2 = semaphore.clone(); + /// + /// tokio::spawn(async move { + /// let permit = semaphore.acquire_many(2).await; + /// assert!(permit.is_err()); + /// println!("waiter received error"); + /// }); + /// + /// println!("closing semaphore"); + /// semaphore2.close(); + /// + /// // Cannot obtain more permits + /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed)) + /// } + /// ``` + pub fn close(&self) { + self.ll_sem.close(); + } } impl<'a> SemaphorePermit<'a> {