Skip to content

Commit

Permalink
sync: allow to configure RwLock max reads
Browse files Browse the repository at this point in the history
Signed-off-by: Zahari Dichev <zaharidichev@gmail.com>
  • Loading branch information
zaharidichev committed Mar 25, 2021
1 parent 6f896d8 commit a7c94aa
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 10 deletions.
73 changes: 67 additions & 6 deletions tokio/src/sync/rwlock.rs
Expand Up @@ -11,10 +11,10 @@ pub(crate) use write_guard::RwLockWriteGuard;
pub(crate) use write_guard_mapped::RwLockMappedWriteGuard;

#[cfg(not(loom))]
const MAX_READS: usize = 32;
const MAX_READS: u32 = std::u32::MAX >> 3;

#[cfg(loom)]
const MAX_READS: usize = 10;
const MAX_READS: u32 = 10;

/// An asynchronous reader-writer lock.
///
Expand Down Expand Up @@ -77,6 +77,9 @@ const MAX_READS: usize = 10;
/// [_write-preferring_]: https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Priority_policies
#[derive(Debug)]
pub struct RwLock<T: ?Sized> {
// maximum number of concurrent readers
mr: u32,

//semaphore to coordinate read and write access to T
s: Semaphore,

Expand Down Expand Up @@ -144,8 +147,35 @@ impl<T: ?Sized> RwLock<T> {
T: Sized,
{
RwLock {
mr: MAX_READS,
c: UnsafeCell::new(value),
s: Semaphore::new(MAX_READS),
s: Semaphore::new(MAX_READS as usize),
}
}

/// Creates a new instance of an `RwLock<T>` which is unlocked
/// and allows a maximum of `max_reads` concurrent readers.
///
/// # Examples
///
/// ```
/// use tokio::sync::RwLock;
///
/// let lock = RwLock::new_with_max_reads(5, 1024);
/// ```
pub fn new_with_max_reads(value: T, max_reads: u32) -> RwLock<T>
where
T: Sized,
{
assert!(
max_reads <= MAX_READS,
"a RwLock may not be created with more than MAX_READS ({})",
MAX_READS
);
RwLock {
mr: max_reads,
c: UnsafeCell::new(value),
s: Semaphore::new(max_reads as usize),
}
}

Expand All @@ -165,8 +195,37 @@ impl<T: ?Sized> RwLock<T> {
T: Sized,
{
RwLock {
mr: MAX_READS,
c: UnsafeCell::new(value),
s: Semaphore::const_new(MAX_READS as usize),
}
}

/// Creates a new instance of an `RwLock<T>` which is unlocked
/// and allows a maximum of `max_reads` concurrent readers.
///
/// # Examples
///
/// ```
/// use tokio::sync::RwLock;
///
/// static LOCK: RwLock<i32> = RwLock::const_new_with_max_reads(5, 1024);
/// ```
#[cfg(all(feature = "parking_lot", not(all(loom, test))))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn const_new_with_max_reads(value: T, max_reads: u32) -> RwLock<T>
where
T: Sized,
{
assert!(
max_reads <= MAX_READS,
"a RwLock may not be created with more than MAX_READS ({})",
MAX_READS
);
RwLock {
mr: max_reads,
c: UnsafeCell::new(value),
s: Semaphore::const_new(MAX_READS),
s: Semaphore::const_new(max_reads as usize),
}
}

Expand Down Expand Up @@ -291,12 +350,13 @@ impl<T: ?Sized> RwLock<T> {
///}
/// ```
pub async fn write(&self) -> RwLockWriteGuard<'_, T> {
self.s.acquire(MAX_READS as u32).await.unwrap_or_else(|_| {
self.s.acquire(self.mr).await.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});
RwLockWriteGuard {
permits_acquired: self.mr,
s: &self.s,
data: self.c.get(),
marker: marker::PhantomData,
Expand Down Expand Up @@ -327,13 +387,14 @@ impl<T: ?Sized> RwLock<T> {
/// }
/// ```
pub fn try_write(&self) -> Result<RwLockWriteGuard<'_, T>, TryLockError> {
match self.s.try_acquire(MAX_READS as u32) {
match self.s.try_acquire(self.mr) {
Ok(permit) => permit,
Err(TryAcquireError::NoPermits) => return Err(TryLockError(())),
Err(TryAcquireError::Closed) => unreachable!(),
}

Ok(RwLockWriteGuard {
permits_acquired: self.mr,
s: &self.s,
data: self.c.get(),
marker: marker::PhantomData,
Expand Down
9 changes: 7 additions & 2 deletions tokio/src/sync/rwlock/write_guard.rs
Expand Up @@ -15,6 +15,7 @@ use std::ops;
/// [`write`]: method@crate::sync::RwLock::write
/// [`RwLock`]: struct@crate::sync::RwLock
pub struct RwLockWriteGuard<'a, T: ?Sized> {
pub(super) permits_acquired: u32,
pub(super) s: &'a Semaphore,
pub(super) data: *mut T,
pub(super) marker: marker::PhantomData<&'a mut T>,
Expand Down Expand Up @@ -64,9 +65,11 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
{
let data = f(&mut *this) as *mut U;
let s = this.s;
let permits_acquired = this.permits_acquired;
// NB: Forget to avoid drop impl from being called.
mem::forget(this);
RwLockMappedWriteGuard {
permits_acquired,
s,
data,
marker: marker::PhantomData,
Expand Down Expand Up @@ -125,9 +128,11 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
None => return Err(this),
};
let s = this.s;
let permits_acquired = this.permits_acquired;
// NB: Forget to avoid drop impl from being called.
mem::forget(this);
Ok(RwLockMappedWriteGuard {
permits_acquired,
s,
data,
marker: marker::PhantomData,
Expand Down Expand Up @@ -185,7 +190,7 @@ impl<'a, T: ?Sized> RwLockWriteGuard<'a, T> {
let RwLockWriteGuard { s, data, .. } = self;

// Release all but one of the permits held by the write guard
s.release(super::MAX_READS - 1);
s.release((self.permits_acquired - 1) as usize);
// NB: Forget to avoid drop impl from being called.
mem::forget(self);
RwLockReadGuard {
Expand Down Expand Up @@ -230,6 +235,6 @@ where

impl<'a, T: ?Sized> Drop for RwLockWriteGuard<'a, T> {
fn drop(&mut self) {
self.s.release(super::MAX_READS);
self.s.release(self.permits_acquired as usize);
}
}
7 changes: 6 additions & 1 deletion tokio/src/sync/rwlock/write_guard_mapped.rs
Expand Up @@ -14,6 +14,7 @@ use std::ops;
/// [mapping]: method@crate::sync::RwLockWriteGuard::map
/// [`RwLockWriteGuard`]: struct@crate::sync::RwLockWriteGuard
pub struct RwLockMappedWriteGuard<'a, T: ?Sized> {
pub(super) permits_acquired: u32,
pub(super) s: &'a Semaphore,
pub(super) data: *mut T,
pub(super) marker: marker::PhantomData<&'a mut T>,
Expand Down Expand Up @@ -62,9 +63,11 @@ impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> {
{
let data = f(&mut *this) as *mut U;
let s = this.s;
let permits_acquired = this.permits_acquired;
// NB: Forget to avoid drop impl from being called.
mem::forget(this);
RwLockMappedWriteGuard {
permits_acquired,
s,
data,
marker: marker::PhantomData,
Expand Down Expand Up @@ -122,9 +125,11 @@ impl<'a, T: ?Sized> RwLockMappedWriteGuard<'a, T> {
None => return Err(this),
};
let s = this.s;
let permits_acquired = this.permits_acquired;
// NB: Forget to avoid drop impl from being called.
mem::forget(this);
Ok(RwLockMappedWriteGuard {
permits_acquired,
s,
data,
marker: marker::PhantomData,
Expand Down Expand Up @@ -166,6 +171,6 @@ where

impl<'a, T: ?Sized> Drop for RwLockMappedWriteGuard<'a, T> {
fn drop(&mut self) {
self.s.release(super::MAX_READS);
self.s.release(self.permits_acquired as usize);
}
}
2 changes: 1 addition & 1 deletion tokio/tests/sync_rwlock.rs
Expand Up @@ -54,7 +54,7 @@ fn read_exclusive_pending() {
// should be made available when one of the shared acesses is dropped
#[test]
fn exhaust_reading() {
let rwlock = RwLock::new(100);
let rwlock = RwLock::new_with_max_reads(100, 1024);
let mut reads = Vec::new();
loop {
let mut t = spawn(rwlock.read());
Expand Down

0 comments on commit a7c94aa

Please sign in to comment.