From 20f681546bde028e1ebb93db18e8aa64327129c3 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 28 Oct 2020 14:48:32 +0000 Subject: [PATCH 1/5] sync: expose `Semaphore::close` in public API Fixes: #3061 Signed-off-by: Zahari Dichev --- tokio/src/sync/semaphore.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 2acccfa2fc1..427257572af 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -162,6 +162,13 @@ impl Semaphore { Err(_) => Err(TryAcquireError(())), } } + + /// Closes the semaphore. + /// + /// This prevents the semaphore from issuing new permits and notifies all pending waiters. + pub fn close(&self) { + self.ll_sem.close(); + } } impl<'a> SemaphorePermit<'a> { From c603ca7188836f70159d9f91777bb53a292f18a9 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Wed, 28 Oct 2020 14:49:17 +0000 Subject: [PATCH 2/5] remove outdated comment Signed-off-by: Zahari Dichev --- tokio/src/sync/batch_semaphore.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 8736971a147..5eb6e12f792 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -164,8 +164,6 @@ impl Semaphore { /// Closes the semaphore. This prevents the semaphore from issuing new /// permits and notifies all pending waiters. - // This will be used once the bounded MPSC is updated to use the new - // semaphore implementation. pub(crate) fn close(&self) { let mut waiters = self.waiters.lock(); // If the semaphore's permits counter has enough permits for an From cbd629abe533fe625a48e052a3dd67ce0be8adc9 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Mon, 14 Dec 2020 13:25:10 +0000 Subject: [PATCH 3/5] feedback Signed-off-by: Zahari Dichev --- tokio/src/sync/batch_semaphore.rs | 17 ++++++++++--- tokio/src/sync/mod.rs | 4 +++- tokio/src/sync/semaphore.rs | 40 +++++++++++++------------------ 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 5eb6e12f792..47946a09c68 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -41,15 +41,26 @@ struct Waitlist { closed: bool, } -/// Error returned by `Semaphore::try_acquire`. +/// Error returned from the [`Semaphore::try_acquire`] function. +/// [`Semaphore::try_acquire`]: Semaphore::try_acquire #[derive(Debug)] -pub(crate) enum TryAcquireError { +pub enum TryAcquireError { + /// The semaphore has been closed and cannot issue new permits Closed, + + /// The has no available permits. NoPermits, } /// Error returned by `Semaphore::acquire`. +/// +/// Error returned from the [`Semaphore::acquire`] function. +/// +/// An `acquire` operation can only fail if the semaphore has been +/// closed. +/// +/// [`Semaphore::acquire`]: Semaphore::acquire #[derive(Debug)] -pub(crate) struct AcquireError(()); +pub struct AcquireError(()); pub(crate) struct Acquire<'a> { node: Waiter, diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index d88bf9d65e0..a183fe6edd3 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -443,8 +443,10 @@ cfg_sync! { pub mod oneshot; pub(crate) mod batch_semaphore; + pub use batch_semaphore::{AcquireError, TryAcquireError}; + mod semaphore; - pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit, TryAcquireError}; + pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit}; mod rwlock; pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard}; diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 427257572af..14d936636e0 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,4 +1,5 @@ use super::batch_semaphore as ll; // low level implementation +use super::{AcquireError, TryAcquireError}; use std::sync::Arc; /// Counting semaphore performing asynchronous permit acquisition. @@ -42,15 +43,6 @@ pub struct OwnedSemaphorePermit { permits: u32, } -/// Error returned from the [`Semaphore::try_acquire`] function. -/// -/// A `try_acquire` operation can only fail if the semaphore has no available -/// permits. -/// -/// [`Semaphore::try_acquire`]: Semaphore::try_acquire -#[derive(Debug)] -pub struct TryAcquireError(()); - #[test] #[cfg(not(loom))] fn bounds() { @@ -96,21 +88,21 @@ impl Semaphore { } /// Acquires permit from the semaphore. - pub async fn acquire(&self) -> SemaphorePermit<'_> { - self.ll_sem.acquire(1).await.unwrap(); - SemaphorePermit { + pub async fn acquire(&self) -> Result, AcquireError> { + self.ll_sem.acquire(1).await?; + Ok(SemaphorePermit { sem: &self, permits: 1, - } + }) } /// Acquires `n` permits from the semaphore - pub async fn acquire_many(&self, n: u32) -> SemaphorePermit<'_> { - self.ll_sem.acquire(n).await.unwrap(); - SemaphorePermit { + pub async fn acquire_many(&self, n: u32) -> Result, AcquireError> { + self.ll_sem.acquire(n).await?; + Ok(SemaphorePermit { sem: &self, permits: n, - } + }) } /// Tries to acquire a permit from the semaphore. @@ -120,7 +112,7 @@ impl Semaphore { sem: self, permits: 1, }), - Err(_) => Err(TryAcquireError(())), + Err(e) => Err(e), } } @@ -131,7 +123,7 @@ impl Semaphore { sem: self, permits: n, }), - Err(_) => Err(TryAcquireError(())), + Err(e) => Err(e), } } @@ -140,12 +132,12 @@ impl Semaphore { /// The semaphore must be wrapped in an [`Arc`] to call this method. /// /// [`Arc`]: std::sync::Arc - pub async fn acquire_owned(self: Arc) -> OwnedSemaphorePermit { - self.ll_sem.acquire(1).await.unwrap(); - OwnedSemaphorePermit { + pub async fn acquire_owned(self: Arc) -> Result { + self.ll_sem.acquire(1).await?; + Ok(OwnedSemaphorePermit { sem: self, permits: 1, - } + }) } /// Tries to acquire a permit from the semaphore. @@ -159,7 +151,7 @@ impl Semaphore { sem: self, permits: 1, }), - Err(_) => Err(TryAcquireError(())), + Err(e) => Err(e), } } From 625084c056607e5338712f2724b2161db9f5b230 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Mon, 14 Dec 2020 13:36:04 +0000 Subject: [PATCH 4/5] remove private links in docs Signed-off-by: Zahari Dichev --- tokio/src/sync/batch_semaphore.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 47946a09c68..8da693cf2d7 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -41,8 +41,7 @@ struct Waitlist { closed: bool, } -/// Error returned from the [`Semaphore::try_acquire`] function. -/// [`Semaphore::try_acquire`]: Semaphore::try_acquire +/// Error returned from the `Semaphore::try_acquire` function. #[derive(Debug)] pub enum TryAcquireError { /// The semaphore has been closed and cannot issue new permits @@ -53,12 +52,8 @@ pub enum TryAcquireError { } /// Error returned by `Semaphore::acquire`. /// -/// Error returned from the [`Semaphore::acquire`] function. -/// /// An `acquire` operation can only fail if the semaphore has been /// closed. -/// -/// [`Semaphore::acquire`]: Semaphore::acquire #[derive(Debug)] pub struct AcquireError(()); From d1844272010cd2ba32b5d563e5cfc7a8876514b6 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Tue, 15 Dec 2020 12:14:41 +0000 Subject: [PATCH 5/5] add docs and doc test Signed-off-by: Zahari Dichev --- tokio/src/sync/batch_semaphore.rs | 19 +++++--- tokio/src/sync/semaphore.rs | 78 +++++++++++++++++++++++++++++-- 2 files changed, 86 insertions(+), 11 deletions(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 8da693cf2d7..803f2a18c6d 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -41,19 +41,26 @@ struct Waitlist { closed: bool, } -/// Error returned from the `Semaphore::try_acquire` function. -#[derive(Debug)] +/// Error returned from the [`Semaphore::try_acquire`] function. +/// +/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire +#[derive(Debug, PartialEq)] pub enum TryAcquireError { - /// The semaphore has been closed and cannot issue new permits + /// The semaphore has been [closed] and cannot issue new permits. + /// + /// [closed]: crate::sync::Semaphore::close Closed, - /// The has no available permits. + /// The semaphore has no available permits. NoPermits, } -/// Error returned by `Semaphore::acquire`. +/// Error returned from the [`Semaphore::acquire`] function. /// /// An `acquire` operation can only fail if the semaphore has been -/// closed. +/// [closed]. +/// +/// [closed]: crate::sync::Semaphore::close +/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire #[derive(Debug)] pub struct AcquireError(()); diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 14d936636e0..5555bdf9324 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -87,7 +87,14 @@ impl Semaphore { self.ll_sem.release(n); } - /// Acquires permit from the semaphore. + /// Acquires a permit from the semaphore. + /// + /// If the semaphore has been closed, this returns an [`AcquireError`]. + /// Otherwise, this returns a [`SemaphorePermit`] representing the + /// acquired permit. + /// + /// [`AcquireError`]: crate::sync::AcquireError + /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire(&self) -> Result, AcquireError> { self.ll_sem.acquire(1).await?; Ok(SemaphorePermit { @@ -96,7 +103,14 @@ impl Semaphore { }) } - /// Acquires `n` permits from the semaphore + /// Acquires `n` permits from the semaphore. + /// + /// If the semaphore has been closed, this returns an [`AcquireError`]. + /// Otherwise, this returns a [`SemaphorePermit`] representing the + /// acquired permits. + /// + /// [`AcquireError`]: crate::sync::AcquireError + /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub async fn acquire_many(&self, n: u32) -> Result, AcquireError> { self.ll_sem.acquire(n).await?; Ok(SemaphorePermit { @@ -106,6 +120,14 @@ impl Semaphore { } /// Tries to acquire a permit from the semaphore. + /// + /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`] + /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise, + /// this returns a [`SemaphorePermit`] representing the acquired permits. + /// + /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed + /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits + /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub fn try_acquire(&self) -> Result, TryAcquireError> { match self.ll_sem.try_acquire(1) { Ok(_) => Ok(SemaphorePermit { @@ -116,7 +138,15 @@ impl Semaphore { } } - /// Tries to acquire `n` permits from the semaphore. + /// Tries to acquire n permits from the semaphore. + /// + /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`] + /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise, + /// this returns a [`SemaphorePermit`] representing the acquired permits. + /// + /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed + /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits + /// [`SemaphorePermit`]: crate::sync::SemaphorePermit pub fn try_acquire_many(&self, n: u32) -> Result, TryAcquireError> { match self.ll_sem.try_acquire(n) { Ok(_) => Ok(SemaphorePermit { @@ -127,11 +157,16 @@ impl Semaphore { } } - /// Acquires permit from the semaphore. + /// Acquires a permit from the semaphore. /// /// The semaphore must be wrapped in an [`Arc`] to call this method. + /// If the semaphore has been closed, this returns an [`AcquireError`]. + /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the + /// acquired permit. /// /// [`Arc`]: std::sync::Arc + /// [`AcquireError`]: crate::sync::AcquireError + /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit pub async fn acquire_owned(self: Arc) -> Result { self.ll_sem.acquire(1).await?; Ok(OwnedSemaphorePermit { @@ -142,9 +177,16 @@ impl Semaphore { /// Tries to acquire a permit from the semaphore. /// - /// The semaphore must be wrapped in an [`Arc`] to call this method. + /// The semaphore must be wrapped in an [`Arc`] to call this method. If + /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`] + /// and a [`TryAcquireError::NoPermits`] if there are no permits left. + /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the + /// acquired permit. /// /// [`Arc`]: std::sync::Arc + /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed + /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits + /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit pub fn try_acquire_owned(self: Arc) -> Result { match self.ll_sem.try_acquire(1) { Ok(_) => Ok(OwnedSemaphorePermit { @@ -158,6 +200,32 @@ impl Semaphore { /// Closes the semaphore. /// /// This prevents the semaphore from issuing new permits and notifies all pending waiters. + /// + /// # Examples + /// + /// ``` + /// use tokio::sync::Semaphore; + /// use std::sync::Arc; + /// use tokio::sync::TryAcquireError; + /// + /// #[tokio::main] + /// async fn main() { + /// let semaphore = Arc::new(Semaphore::new(1)); + /// let semaphore2 = semaphore.clone(); + /// + /// tokio::spawn(async move { + /// let permit = semaphore.acquire_many(2).await; + /// assert!(permit.is_err()); + /// println!("waiter received error"); + /// }); + /// + /// println!("closing semaphore"); + /// semaphore2.close(); + /// + /// // Cannot obtain more permits + /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed)) + /// } + /// ``` pub fn close(&self) { self.ll_sem.close(); }