Skip to content

Commit

Permalink
Merge pull request #71 from tremor-rs/per-test-timeout
Browse files Browse the repository at this point in the history
Per test timeout
  • Loading branch information
palfrey committed Aug 4, 2022
2 parents 34d6995 + d416977 commit d108a62
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 114 deletions.
16 changes: 16 additions & 0 deletions README.md
Expand Up @@ -53,3 +53,19 @@ extern crate serial_test;
for earlier versions.

You can then either add `#[serial]` or `#[serial(some_text)]` to tests as required.

For each test, a timeout can be specified with the `timeout_ms` parameter to the [serial](macro@serial) attribute. Note that
the timeout is counted from the first invocation of the test, not from the time the previous test was completed. This can
lead to some unpredictable behavior based on the number of parallel tests run on the system.
```rust
#[test]
#[serial(timeout_ms = 1000)]
fn test_serial_one() {
// Do things
}
#[test]
#[serial(test_name, timeout_ms = 1000)]
fn test_serial_another() {
// Do things
}
```
6 changes: 1 addition & 5 deletions serial_test/Cargo.toml
Expand Up @@ -27,18 +27,14 @@ itertools = "0.10"
tokio = { version = "^1.17", features = ["macros", "rt"] }

[features]
default = ["logging", "timeout"]
default = ["logging"]

## Switches on debug logging (and requires the `log` package)
logging = ["log"]

## The file_locks feature unlocks the `file_serial`/`file_parallel` macros
file_locks = ["fslock"]

## The `timeout` feature lets tests time out after a certain amount of time
## if not enabled tests will wait indefinitely to be started
timeout = []

docsrs = ["document-features"]

# docs.rs-specific configuration
Expand Down
47 changes: 9 additions & 38 deletions serial_test/src/code_lock.rs
@@ -1,15 +1,12 @@
use crate::rwlock::{Locks, MutexGuardWrapper};
use dashmap::{try_result::TryResult, DashMap};
use lazy_static::lazy_static;
#[cfg(all(feature = "logging", feature = "timeout"))]
#[cfg(all(feature = "logging"))]
use log::debug;
#[cfg(feature = "timeout")]
use parking_lot::RwLock;
use std::sync::{atomic::AtomicU32, Arc};
#[cfg(feature = "timeout")]
use std::time::Duration;
#[cfg(feature = "timeout")]
use std::time::Instant;
use std::{
sync::{atomic::AtomicU32, Arc},
time::{Duration, Instant},
};

pub(crate) struct UniqueReentrantMutex {
locks: Locks,
Expand Down Expand Up @@ -49,11 +46,6 @@ lazy_static! {
static ref MUTEX_ID: Arc<AtomicU32> = Arc::new(AtomicU32::new(1));
}

#[cfg(feature = "timeout")]
lazy_static! {
static ref MAX_WAIT: Arc<RwLock<Duration>> = Arc::new(RwLock::new(Duration::from_secs(60)));
}

impl Default for UniqueReentrantMutex {
fn default() -> Self {
Self {
Expand All @@ -63,30 +55,10 @@ impl Default for UniqueReentrantMutex {
}
}

/// Sets the maximum amount of time the serial locks will wait to unlock.
/// By default, this is set to 60 seconds, which is almost always much longer than is needed.
/// This is deliberately set high to try and avoid situations where we accidentally hit the limits
/// but is set at all so we can timeout rather than hanging forever.
///
/// However, sometimes if you've got a *lot* of serial tests it might theoretically not be enough,
/// hence this method.
///
/// This function is only available when the `timeout` feature is enabled.
#[cfg(feature = "timeout")]
pub fn set_max_wait(max_wait: Duration) {
*MAX_WAIT.write() = max_wait;
}

#[cfg(feature = "timeout")]
pub(crate) fn wait_duration() -> Duration {
*MAX_WAIT.read()
}

pub(crate) fn check_new_key(name: &str) {
#[cfg(feature = "timeout")]
pub(crate) fn check_new_key(name: &str, max_wait: Option<Duration>) {
let start = Instant::now();
loop {
#[cfg(all(feature = "logging", feature = "timeout"))]
#[cfg(all(feature = "logging"))]
{
let duration = start.elapsed();
debug!("Waiting for '{}' {:?}", name, duration);
Expand All @@ -113,10 +85,9 @@ pub(crate) fn check_new_key(name: &str) {
// If the try_entry fails, then go around the loop again
// Odds are another test was also locking on the write and has now written the key

#[cfg(feature = "timeout")]
{
if let Some(max_wait) = max_wait {
let duration = start.elapsed();
if duration > wait_duration() {
if duration > max_wait {
panic!("Timeout waiting for '{}' {:?}", name, duration);
}
}
Expand Down
2 changes: 0 additions & 2 deletions serial_test/src/lib.rs
Expand Up @@ -64,8 +64,6 @@ mod parallel_file_lock;
#[cfg(feature = "file_locks")]
mod serial_file_lock;

#[cfg(feature = "timeout")]
pub use code_lock::set_max_wait;
pub use parallel_code_lock::{
local_async_parallel_core, local_async_parallel_core_with_return, local_parallel_core,
local_parallel_core_with_return,
Expand Down
22 changes: 14 additions & 8 deletions serial_test/src/parallel_code_lock.rs
Expand Up @@ -2,14 +2,15 @@

use crate::code_lock::{check_new_key, LOCKS};
use futures::FutureExt;
use std::panic;
use std::{panic, time::Duration};

#[doc(hidden)]
pub fn local_parallel_core_with_return<E>(
name: &str,
max_wait: Option<Duration>,
function: fn() -> Result<(), E>,
) -> Result<(), E> {
check_new_key(name);
check_new_key(name, max_wait);

let lock = LOCKS.get(name).unwrap();
lock.start_parallel();
Expand All @@ -24,8 +25,8 @@ pub fn local_parallel_core_with_return<E>(
}

#[doc(hidden)]
pub fn local_parallel_core(name: &str, function: fn()) {
check_new_key(name);
pub fn local_parallel_core(name: &str, max_wait: Option<Duration>, function: fn()) {
check_new_key(name, max_wait);

let lock = LOCKS.get(name).unwrap();
lock.start_parallel();
Expand All @@ -41,9 +42,10 @@ pub fn local_parallel_core(name: &str, function: fn()) {
#[doc(hidden)]
pub async fn local_async_parallel_core_with_return<E>(
name: &str,
max_wait: Option<Duration>,
fut: impl std::future::Future<Output = Result<(), E>> + panic::UnwindSafe,
) -> Result<(), E> {
check_new_key(name);
check_new_key(name, max_wait);

let lock = LOCKS.get(name).unwrap();
lock.start_parallel();
Expand All @@ -60,9 +62,10 @@ pub async fn local_async_parallel_core_with_return<E>(
#[doc(hidden)]
pub async fn local_async_parallel_core(
name: &str,
max_wait: Option<Duration>,
fut: impl std::future::Future<Output = ()> + panic::UnwindSafe,
) {
check_new_key(name);
check_new_key(name, max_wait);

let lock = LOCKS.get(name).unwrap();
lock.start_parallel();
Expand All @@ -84,7 +87,7 @@ mod tests {
#[test]
fn unlock_on_assert_sync_without_return() {
let _ = panic::catch_unwind(|| {
local_parallel_core("unlock_on_assert_sync_without_return", || {
local_parallel_core("unlock_on_assert_sync_without_return", None, || {
assert!(false);
})
});
Expand All @@ -102,6 +105,7 @@ mod tests {
let _ = panic::catch_unwind(|| {
local_parallel_core_with_return(
"unlock_on_assert_sync_with_return",
None,
|| -> Result<(), Error> {
assert!(false);
Ok(())
Expand All @@ -123,7 +127,8 @@ mod tests {
assert!(false);
}
async fn call_serial_test_fn() {
local_async_parallel_core("unlock_on_assert_async_without_return", demo_assert()).await
local_async_parallel_core("unlock_on_assert_async_without_return", None, demo_assert())
.await
}
// as per https://stackoverflow.com/a/66529014/320546
let _ = panic::catch_unwind(|| {
Expand Down Expand Up @@ -151,6 +156,7 @@ mod tests {
async fn call_serial_test_fn() {
local_async_parallel_core_with_return(
"unlock_on_assert_async_with_return",
None,
demo_assert(),
)
.await;
Expand Down
16 changes: 14 additions & 2 deletions serial_test/src/parallel_file_lock.rs
@@ -1,11 +1,16 @@
use std::panic;
use std::{panic, time::Duration};

use futures::FutureExt;

use crate::file_lock::make_lock_for_name_and_path;

#[doc(hidden)]
pub fn fs_parallel_core(name: &str, path: Option<&str>, function: fn()) {
pub fn fs_parallel_core(
name: &str,
_max_wait: Option<Duration>,
path: Option<&str>,
function: fn(),
) {
make_lock_for_name_and_path(name, path).start_parallel();
let res = panic::catch_unwind(|| {
function();
Expand All @@ -19,6 +24,7 @@ pub fn fs_parallel_core(name: &str, path: Option<&str>, function: fn()) {
#[doc(hidden)]
pub fn fs_parallel_core_with_return<E>(
name: &str,
_max_wait: Option<Duration>,
path: Option<&str>,
function: fn() -> Result<(), E>,
) -> Result<(), E> {
Expand All @@ -36,6 +42,7 @@ pub fn fs_parallel_core_with_return<E>(
#[doc(hidden)]
pub async fn fs_async_parallel_core_with_return<E>(
name: &str,
_max_wait: Option<Duration>,
path: Option<&str>,
fut: impl std::future::Future<Output = Result<(), E>> + panic::UnwindSafe,
) -> Result<(), E> {
Expand All @@ -53,6 +60,7 @@ pub async fn fs_async_parallel_core_with_return<E>(
#[doc(hidden)]
pub async fn fs_async_parallel_core(
name: &str,
_max_wait: Option<Duration>,
path: Option<&str>,
fut: impl std::future::Future<Output = ()> + panic::UnwindSafe,
) {
Expand Down Expand Up @@ -84,6 +92,7 @@ mod tests {
let _ = panic::catch_unwind(|| {
fs_parallel_core(
"unlock_on_assert_sync_without_return",
None,
Some(&lock_path),
|| {
assert!(false);
Expand All @@ -99,6 +108,7 @@ mod tests {
let _ = panic::catch_unwind(|| {
fs_parallel_core_with_return(
"unlock_on_assert_sync_with_return",
None,
Some(&lock_path),
|| -> Result<(), Error> {
assert!(false);
Expand All @@ -118,6 +128,7 @@ mod tests {
async fn call_serial_test_fn(lock_path: &str) {
fs_async_parallel_core(
"unlock_on_assert_async_without_return",
None,
Some(&lock_path),
demo_assert(),
)
Expand Down Expand Up @@ -146,6 +157,7 @@ mod tests {
async fn call_serial_test_fn(lock_path: &str) {
fs_async_parallel_core_with_return(
"unlock_on_assert_async_with_return",
None,
Some(&lock_path),
demo_assert(),
)
Expand Down
23 changes: 15 additions & 8 deletions serial_test/src/serial_code_lock.rs
@@ -1,13 +1,15 @@
#![allow(clippy::await_holding_lock)]

use crate::code_lock::{check_new_key, LOCKS};
use std::time::Duration;

#[doc(hidden)]
pub fn local_serial_core_with_return<E>(
name: &str,
max_wait: Option<Duration>,
function: fn() -> Result<(), E>,
) -> Result<(), E> {
check_new_key(name);
check_new_key(name, max_wait);

let unlock = LOCKS.get(name).expect("key to be set");
// _guard needs to be named to avoid being instant dropped
Expand All @@ -16,8 +18,8 @@ pub fn local_serial_core_with_return<E>(
}

#[doc(hidden)]
pub fn local_serial_core(name: &str, function: fn()) {
check_new_key(name);
pub fn local_serial_core(name: &str, max_wait: Option<Duration>, function: fn()) {
check_new_key(name, max_wait);

let unlock = LOCKS.get(name).expect("key to be set");
// _guard needs to be named to avoid being instant dropped
Expand All @@ -28,9 +30,10 @@ pub fn local_serial_core(name: &str, function: fn()) {
#[doc(hidden)]
pub async fn local_async_serial_core_with_return<E>(
name: &str,
max_wait: Option<Duration>,
fut: impl std::future::Future<Output = Result<(), E>>,
) -> Result<(), E> {
check_new_key(name);
check_new_key(name, max_wait);

let unlock = LOCKS.get(name).expect("key to be set");
// _guard needs to be named to avoid being instant dropped
Expand All @@ -39,8 +42,12 @@ pub async fn local_async_serial_core_with_return<E>(
}

#[doc(hidden)]
pub async fn local_async_serial_core(name: &str, fut: impl std::future::Future<Output = ()>) {
check_new_key(name);
pub async fn local_async_serial_core(
name: &str,
max_wait: Option<Duration>,
fut: impl std::future::Future<Output = ()>,
) {
check_new_key(name, max_wait);

let unlock = LOCKS.get(name).expect("key to be set");
// _guard needs to be named to avoid being instant dropped
Expand Down Expand Up @@ -76,7 +83,7 @@ mod tests {
let c = barrier.clone();
threads.push(thread::spawn(move || {
c.wait();
check_new_key("foo");
check_new_key("foo", None);
{
let unlock = local_locks.get("foo").expect("read didn't work");
let mutex = unlock.value();
Expand Down Expand Up @@ -104,7 +111,7 @@ mod tests {
#[test]
fn unlock_on_assert() {
let _ = std::panic::catch_unwind(|| {
local_serial_core("assert", || {
local_serial_core("assert", None, || {
assert!(false);
})
});
Expand Down

0 comments on commit d108a62

Please sign in to comment.