Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Semaphore::close #3065

Merged
merged 5 commits into from Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 18 additions & 7 deletions tokio/src/sync/batch_semaphore.rs
Expand Up @@ -41,15 +41,28 @@ struct Waitlist {
closed: bool,
}

/// Error returned by `Semaphore::try_acquire`.
#[derive(Debug)]
pub(crate) enum TryAcquireError {
/// Error returned from the [`Semaphore::try_acquire`] function.
///
/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
#[derive(Debug, PartialEq)]
pub enum TryAcquireError {
/// The semaphore has been [closed] and cannot issue new permits.
///
/// [closed]: crate::sync::Semaphore::close
Closed,

/// The semaphore has no available permits.
NoPermits,
}
/// Error returned by `Semaphore::acquire`.
/// Error returned from the [`Semaphore::acquire`] function.
///
/// An `acquire` operation can only fail if the semaphore has been
/// [closed].
///
/// [closed]: crate::sync::Semaphore::close
/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
#[derive(Debug)]
pub(crate) struct AcquireError(());
pub struct AcquireError(());

pub(crate) struct Acquire<'a> {
node: Waiter,
Expand Down Expand Up @@ -164,8 +177,6 @@ impl Semaphore {

/// Closes the semaphore. This prevents the semaphore from issuing new
/// permits and notifies all pending waiters.
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
pub(crate) fn close(&self) {
let mut waiters = self.waiters.lock();
// If the semaphore's permits counter has enough permits for an
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/sync/mod.rs
Expand Up @@ -443,8 +443,10 @@ cfg_sync! {
pub mod oneshot;

pub(crate) mod batch_semaphore;
pub use batch_semaphore::{AcquireError, TryAcquireError};

mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit, TryAcquireError};
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};

mod rwlock;
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
Expand Down
125 changes: 96 additions & 29 deletions tokio/src/sync/semaphore.rs
@@ -1,4 +1,5 @@
use super::batch_semaphore as ll; // low level implementation
use super::{AcquireError, TryAcquireError};
use std::sync::Arc;

/// Counting semaphore performing asynchronous permit acquisition.
Expand Down Expand Up @@ -42,15 +43,6 @@ pub struct OwnedSemaphorePermit {
permits: u32,
}

/// Error returned from the [`Semaphore::try_acquire`] function.
///
/// A `try_acquire` operation can only fail if the semaphore has no available
/// permits.
///
/// [`Semaphore::try_acquire`]: Semaphore::try_acquire
#[derive(Debug)]
pub struct TryAcquireError(());

#[test]
#[cfg(not(loom))]
fn bounds() {
Expand Down Expand Up @@ -95,73 +87,148 @@ impl Semaphore {
self.ll_sem.release(n);
}

/// Acquires permit from the semaphore.
pub async fn acquire(&self) -> SemaphorePermit<'_> {
self.ll_sem.acquire(1).await.unwrap();
SemaphorePermit {
/// Acquires a permit from the semaphore.
///
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`SemaphorePermit`] representing the
/// acquired permit.
///
/// [`AcquireError`]: crate::sync::AcquireError
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
self.ll_sem.acquire(1).await?;
Ok(SemaphorePermit {
sem: &self,
permits: 1,
}
})
}

/// Acquires `n` permits from the semaphore
pub async fn acquire_many(&self, n: u32) -> SemaphorePermit<'_> {
self.ll_sem.acquire(n).await.unwrap();
SemaphorePermit {
/// Acquires `n` permits from the semaphore.
///
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`SemaphorePermit`] representing the
/// acquired permits.
///
/// [`AcquireError`]: crate::sync::AcquireError
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
self.ll_sem.acquire(n).await?;
Ok(SemaphorePermit {
sem: &self,
permits: n,
}
})
}

/// Tries to acquire a permit from the semaphore.
///
/// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
/// this returns a [`SemaphorePermit`] representing the acquired permits.
///
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
match self.ll_sem.try_acquire(1) {
Ok(_) => Ok(SemaphorePermit {
sem: self,
permits: 1,
}),
Err(_) => Err(TryAcquireError(())),
Err(e) => Err(e),
}
}

/// Tries to acquire `n` permits from the semaphore.
/// Tries to acquire n permits from the semaphore.
///
/// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
/// this returns a [`SemaphorePermit`] representing the acquired permits.
///
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`SemaphorePermit`]: crate::sync::SemaphorePermit
pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
match self.ll_sem.try_acquire(n) {
Ok(_) => Ok(SemaphorePermit {
sem: self,
permits: n,
}),
Err(_) => Err(TryAcquireError(())),
Err(e) => Err(e),
}
}

/// Acquires permit from the semaphore.
/// Acquires a permit from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method.
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
hawkw marked this conversation as resolved.
Show resolved Hide resolved
/// [`Arc`]: std::sync::Arc
pub async fn acquire_owned(self: Arc<Self>) -> OwnedSemaphorePermit {
self.ll_sem.acquire(1).await.unwrap();
OwnedSemaphorePermit {
/// [`AcquireError`]: crate::sync::AcquireError
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
self.ll_sem.acquire(1).await?;
Ok(OwnedSemaphorePermit {
sem: self,
permits: 1,
}
})
}

/// Tries to acquire a permit from the semaphore.
///
/// The semaphore must be wrapped in an [`Arc`] to call this method.
/// The semaphore must be wrapped in an [`Arc`] to call this method. If
/// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
/// and a [`TryAcquireError::NoPermits`] if there are no permits left.
/// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
/// acquired permit.
///
/// [`Arc`]: std::sync::Arc
/// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
/// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
/// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
pub fn try_acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, TryAcquireError> {
match self.ll_sem.try_acquire(1) {
Ok(_) => Ok(OwnedSemaphorePermit {
sem: self,
permits: 1,
}),
Err(_) => Err(TryAcquireError(())),
Err(e) => Err(e),
}
}

/// Closes the semaphore.
///
/// This prevents the semaphore from issuing new permits and notifies all pending waiters.
hawkw marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Examples
///
/// ```
/// use tokio::sync::Semaphore;
/// use std::sync::Arc;
/// use tokio::sync::TryAcquireError;
///
/// #[tokio::main]
/// async fn main() {
/// let semaphore = Arc::new(Semaphore::new(1));
/// let semaphore2 = semaphore.clone();
///
/// tokio::spawn(async move {
/// let permit = semaphore.acquire_many(2).await;
/// assert!(permit.is_err());
/// println!("waiter received error");
/// });
///
/// println!("closing semaphore");
/// semaphore2.close();
///
/// // Cannot obtain more permits
/// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
/// }
/// ```
pub fn close(&self) {
self.ll_sem.close();
}
}

impl<'a> SemaphorePermit<'a> {
Expand Down