Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Condvar::wait_while convenience methods #343

Merged
merged 5 commits into from
May 31, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
249 changes: 249 additions & 0 deletions src/condvar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use core::{
};
use lock_api::RawMutex as RawMutex_;
use parking_lot_core::{self, ParkResult, RequeueOp, UnparkResult, DEFAULT_PARK_TOKEN};
use std::ops::DerefMut;
use std::time::{Duration, Instant};

/// A type indicating whether a timed wait on a condition variable returned
Expand All @@ -29,6 +30,17 @@ impl WaitTimeoutResult {
}
}

/// A type indicating how many times a thread was blocked during wait_while.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub struct WaitWhileResult(u32);

impl WaitWhileResult {
#[inline]
pub fn num_iters(self) -> u32 {
self.0
}
}
bryanhitc marked this conversation as resolved.
Show resolved Hide resolved

/// A Condition Variable
///
/// Condition variables represent the ability to block a thread such that it
Expand Down Expand Up @@ -383,6 +395,134 @@ impl Condvar {
let deadline = util::to_deadline(timeout);
self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, deadline)
}

#[inline]
bryanhitc marked this conversation as resolved.
Show resolved Hide resolved
fn wait_while_until_internal<T, F>(
&self,
mutex_guard: &mut MutexGuard<'_, T>,
mut condition: F,
timeout: Option<Instant>,
) -> (WaitWhileResult, WaitTimeoutResult)
where
T: ?Sized,
F: FnMut(&mut T) -> bool,
{
let mut result = WaitWhileResult(0);
let mut timeout_result = WaitTimeoutResult(false);

while !timeout_result.timed_out() && condition(mutex_guard.deref_mut()) {
result.0 = result.0.saturating_add(1);
timeout_result =
self.wait_until_internal(unsafe { MutexGuard::mutex(mutex_guard).raw() }, timeout);
}

(result, timeout_result)
}
/// Blocks the current thread until this condition variable receives a
/// notification. If the provided condition evaluates to `false`, then the
/// thread is no longer blocked and the operation is completed. If the
/// condition evaluates to `true`, then the thread is blocked again and
/// waits for another notification before repeating this process.
///
/// This function will atomically unlock the mutex specified (represented by
/// `mutex_guard`) and block the current thread. This means that any calls
/// to `notify_*()` which happen logically after the mutex is unlocked are
/// candidates to wake this thread up. When this function call returns, the
/// lock specified will have been re-acquired.
///
/// # Panics
///
/// This function will panic if another thread is waiting on the `Condvar`
/// with a different `Mutex` object.
#[inline]
pub fn wait_while<T, F>(
&self,
mutex_guard: &mut MutexGuard<'_, T>,
condition: F,
) -> WaitWhileResult
where
T: ?Sized,
F: FnMut(&mut T) -> bool,
{
self.wait_while_until_internal(mutex_guard, condition, None)
.0
}

/// Waits on this condition variable for a notification, timing out after
/// the specified time instant. If the provided condition evaluates to
/// `false`, then the thread is no longer blocked and the operation is
/// completed. If the condition evaluates to `true`, then the thread is
/// blocked again and waits for another notification before repeating
/// this process.
///
/// The semantics of this function are equivalent to `wait()` except that
/// the thread will be blocked roughly until `timeout` is reached. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `timeout`.
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
///
/// The returned `WaitTimeoutResult` value indicates if the timeout is
/// known to have elapsed.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
///
/// # Panics
///
/// This function will panic if another thread is waiting on the `Condvar`
/// with a different `Mutex` object.
#[inline]
pub fn wait_while_until<T, F>(
&self,
mutex_guard: &mut MutexGuard<'_, T>,
condition: F,
timeout: Instant,
) -> (WaitWhileResult, WaitTimeoutResult)
where
T: ?Sized,
F: FnMut(&mut T) -> bool,
{
self.wait_while_until_internal(mutex_guard, condition, Some(timeout))
}

/// Waits on this condition variable for a notification, timing out after a
/// specified duration. If the provided condition evaluates to `false`,
/// then the thread is no longer blocked and the operation is completed.
/// If the condition evaluates to `true`, then the thread is blocked again
/// and waits for another notification before repeating this process.
///
/// The semantics of this function are equivalent to `wait()` except that
/// the thread will be blocked for roughly no longer than `timeout`. This
/// method should not be used for precise timing due to anomalies such as
/// preemption or platform differences that may not cause the maximum
/// amount of time waited to be precisely `timeout`.
///
/// Note that the best effort is made to ensure that the time waited is
/// measured with a monotonic clock, and not affected by the changes made to
/// the system time.
///
/// The returned `WaitTimeoutResult` value indicates if the timeout is
/// known to have elapsed.
///
/// Like `wait`, the lock specified will be re-acquired when this function
/// returns, regardless of whether the timeout elapsed or not.
#[inline]
pub fn wait_while_for<T: ?Sized, F>(
&self,
mutex_guard: &mut MutexGuard<'_, T>,
condition: F,
timeout: Duration,
) -> (WaitWhileResult, WaitTimeoutResult)
where
F: FnMut(&mut T) -> bool,
{
let deadline = util::to_deadline(timeout);
self.wait_while_until_internal(mutex_guard, condition, deadline)
}
}

impl Default for Condvar {
Expand All @@ -404,6 +544,8 @@ mod tests {
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::thread;
use std::thread::sleep;
use std::thread::JoinHandle;
use std::time::Duration;
use std::time::Instant;

Expand Down Expand Up @@ -572,6 +714,113 @@ mod tests {
drop(g);
}

fn spawn_wait_while_notifier(
mutex: Arc<Mutex<u32>>,
cv: Arc<Condvar>,
num_iters: u32,
timeout: Option<Instant>,
) -> JoinHandle<()> {
thread::spawn(move || {
for epoch in 1..=num_iters {
// spin to wait for main test thread to block
// before notifying it to wake back up and check
// its condition.
let mut sleep_backoff = Duration::from_millis(1);
let _mutex_guard = loop {
let mutex_guard = mutex.lock();

if let Some(timeout) = timeout {
if Instant::now() >= timeout {
return;
}
}

if *mutex_guard == epoch {
break mutex_guard;
}

drop(mutex_guard);

// give main test thread a good chance to
// acquire the lock before this thread does.
sleep(sleep_backoff);
sleep_backoff *= 2;
};

cv.notify_one();
}
})
}

#[test]
fn wait_while_until_internal_does_not_wait_if_initially_false() {
let mutex = Arc::new(Mutex::new(()));
let cv = Arc::new(Condvar::new());

let mut mutex_guard = mutex.lock();

let result = cv
.wait_while_until_internal(&mut mutex_guard, |_| false, None)
.0;

assert!(result.num_iters() == 0);
}

#[test]
fn wait_while_until_internal_times_out_before_false() {
let mutex = Arc::new(Mutex::new(0));
let cv = Arc::new(Condvar::new());

let condition = |counter: &mut u32| {
*counter += 1;
true
};

let mut mutex_guard = mutex.lock();
let timeout = Some(Instant::now() + Duration::from_millis(50));
bryanhitc marked this conversation as resolved.
Show resolved Hide resolved
let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), u32::MAX, timeout);

let (result, timeout_result) =
cv.wait_while_until_internal(&mut mutex_guard, condition, timeout);

assert!(timeout_result.timed_out());
assert!(result.num_iters() > 0);
assert!(result.num_iters() < u32::MAX);

// prevent deadlock with notifier
drop(mutex_guard);
handle.join().unwrap();
}

#[test]
fn wait_while_until_internal() {
let mutex = Arc::new(Mutex::new(0));
let cv = Arc::new(Condvar::new());

let num_iters = 4;

let condition = |counter: &mut u32| {
*counter += 1;
*counter <= num_iters
};

let mut mutex_guard = mutex.lock();
let handle = spawn_wait_while_notifier(mutex.clone(), cv.clone(), num_iters, None);

let (result, timeout_result) =
cv.wait_while_until_internal(&mut mutex_guard, condition, None);

assert!(!timeout_result.timed_out());
assert!(result.num_iters() == num_iters);
assert!(*mutex_guard == num_iters + 1);

let result = cv.wait_while(&mut mutex_guard, condition);
handle.join().unwrap();

assert!(result.num_iters() == 0);
assert!(*mutex_guard == num_iters + 2);
}

#[test]
#[should_panic]
fn two_mutexes() {
Expand Down