Skip to content

Commit

Permalink
Merge pull request #302 from Amanieu/wasm-fair-timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Amanieu committed Nov 1, 2021
2 parents aecb031 + 3fd2ce4 commit 1cf1274
Show file tree
Hide file tree
Showing 22 changed files with 126 additions and 125 deletions.
3 changes: 0 additions & 3 deletions Cargo.toml
Expand Up @@ -13,7 +13,6 @@ edition = "2018"
[dependencies]
parking_lot_core = { path = "core", version = "0.8.4" }
lock_api = { path = "lock_api", version = "0.4.5" }
instant = "0.1.9"

[dev-dependencies]
rand = "0.8.3"
Expand All @@ -28,8 +27,6 @@ owning_ref = ["lock_api/owning_ref"]
nightly = ["parking_lot_core/nightly", "lock_api/nightly"]
deadlock_detection = ["parking_lot_core/deadlock_detection"]
serde = ["lock_api/serde"]
stdweb = ["instant/stdweb"]
wasm-bindgen = ["instant/wasm-bindgen"]
send_guard = []

[workspace]
Expand Down
1 change: 0 additions & 1 deletion core/Cargo.toml
Expand Up @@ -15,7 +15,6 @@ smallvec = "1.6.1"
petgraph = { version = "0.5.1", optional = true }
thread-id = { version = "4.0.0", optional = true }
backtrace = { version = "0.3.60", optional = true }
instant = "0.1.9"

[target.'cfg(unix)'.dependencies]
libc = "0.2.95"
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Expand Up @@ -46,7 +46,7 @@
#![cfg_attr(
all(
feature = "nightly",
target_arch = "wasm32",
target_family = "wasm",
target_feature = "atomics"
),
feature(stdsimd)
Expand Down
38 changes: 31 additions & 7 deletions core/src/parking_lot.rs
Expand Up @@ -12,9 +12,33 @@ use core::{
ptr,
sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
};
use instant::Instant;
use smallvec::SmallVec;
use std::time::Duration;
use std::time::{Duration, Instant};

// Don't use Instant on wasm32-unknown-unknown, it just panics.
cfg_if::cfg_if! {
if #[cfg(all(
target_family = "wasm",
target_os = "unknown",
target_vendor = "unknown"
))] {
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct TimeoutInstant;
impl TimeoutInstant {
fn now() -> TimeoutInstant {
TimeoutInstant
}
}
impl core::ops::Add<Duration> for TimeoutInstant {
type Output = Self;
fn add(self, _rhs: Duration) -> Self::Output {
TimeoutInstant
}
}
} else {
use std::time::Instant as TimeoutInstant;
}
}

static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);

Expand Down Expand Up @@ -47,7 +71,7 @@ impl HashTable {
let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;

let now = Instant::now();
let now = TimeoutInstant::now();
let mut entries = Vec::with_capacity(new_size);
for i in 0..new_size {
// We must ensure the seed is not zero
Expand Down Expand Up @@ -77,7 +101,7 @@ struct Bucket {

impl Bucket {
#[inline]
pub fn new(timeout: Instant, seed: u32) -> Self {
pub fn new(timeout: TimeoutInstant, seed: u32) -> Self {
Self {
mutex: WordLock::new(),
queue_head: Cell::new(ptr::null()),
Expand All @@ -89,22 +113,22 @@ impl Bucket {

struct FairTimeout {
// Next time at which point be_fair should be set
timeout: Instant,
timeout: TimeoutInstant,

// the PRNG state for calculating the next timeout
seed: u32,
}

impl FairTimeout {
#[inline]
fn new(timeout: Instant, seed: u32) -> FairTimeout {
fn new(timeout: TimeoutInstant, seed: u32) -> FairTimeout {
FairTimeout { timeout, seed }
}

// Determine whether we should force a fair unlock, and update the timeout
#[inline]
fn should_timeout(&mut self) -> bool {
let now = Instant::now();
let now = TimeoutInstant::now();
if now > self.timeout {
// Time between 0 and 1ms.
let nanos = self.gen_u32() % 1_000_000;
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/generic.rs
Expand Up @@ -9,8 +9,8 @@
//! parking facilities available.

use core::sync::atomic::{spin_loop_hint, AtomicBool, Ordering};
use instant::Instant;
use std::thread;
use std::time::Instant;

// Helper type for putting a thread to sleep until some other thread wakes it up
pub struct ThreadParker {
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/linux.rs
Expand Up @@ -9,9 +9,9 @@ use core::{
ptr,
sync::atomic::{AtomicI32, Ordering},
};
use instant::Instant;
use libc;
use std::thread;
use std::time::Instant;

// x32 Linux uses a non-standard type for tv_nsec in timespec.
// See https://sourceware.org/bugzilla/show_bug.cgi?id=16437
Expand Down
6 changes: 3 additions & 3 deletions core/src/thread_parker/mod.rs
@@ -1,5 +1,5 @@
use cfg_if::cfg_if;
use instant::Instant;
use std::time::Instant;

/// Trait for the platform thread parker implementation.
///
Expand Down Expand Up @@ -68,12 +68,12 @@ cfg_if! {
mod imp;
} else if #[cfg(all(
feature = "nightly",
target_arch = "wasm32",
target_family = "wasm",
target_feature = "atomics"
))] {
#[path = "wasm_atomic.rs"]
mod imp;
} else if #[cfg(target_arch = "wasm32")] {
} else if #[cfg(target_family = "wasm")] {
#[path = "wasm.rs"]
mod imp;
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/redox.rs
Expand Up @@ -9,8 +9,8 @@ use core::{
ptr,
sync::atomic::{AtomicI32, Ordering},
};
use instant::Instant;
use std::thread;
use std::time::Instant;
use syscall::{
call::futex,
data::TimeSpec,
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/sgx.rs
Expand Up @@ -6,7 +6,7 @@
// copied, modified, or distributed except according to those terms.

use core::sync::atomic::{AtomicBool, Ordering};
use instant::Instant;
use std::time::Instant;
use std::{
io,
os::fortanix_sgx::{
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/unix.rs
Expand Up @@ -11,8 +11,8 @@ use core::{
cell::{Cell, UnsafeCell},
mem::MaybeUninit,
};
use instant::Instant;
use libc;
use std::time::Instant;
use std::{thread, time::Duration};

// x32 Linux uses a non-standard type for tv_nsec in timespec.
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/wasm.rs
Expand Up @@ -8,8 +8,8 @@
//! The wasm platform can't park when atomic support is not available.
//! So this ThreadParker just panics on any attempt to park.

use instant::Instant;
use std::thread;
use std::time::Instant;

pub struct ThreadParker(());

Expand Down
32 changes: 2 additions & 30 deletions core/src/thread_parker/wasm_atomic.rs
Expand Up @@ -5,41 +5,13 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

use cfg_if::cfg_if;
use core::{
arch::wasm32,
sync::atomic::{AtomicI32, Ordering},
};
use instant::Instant;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::{convert::TryFrom, thread};

cfg_if! {
if #[cfg(all(
target_arch = "wasm32",
target_os = "unknown",
target_vendor = "unknown"
))] {
// This function serves as a polyfill for `Instant::checked_duration_since`, which is
// currently not implemented for wasm32-unknown-unknown.
// TODO: Remove this shim once it
fn checked_duration_since_now(other: Instant) -> Option<Duration> {
let now = Instant::now();

if other < now {
None
} else {
Some(other.duration_since(now))
}
}
} else {
// If we are not targeting wasm32, we can use the native `checked_duration_since`.
fn checked_duration_since_now(timeout: Instant) -> Option<Duration> {
timeout.checked_duration_since(Instant::now())
}
}
}

// Helper type for putting a thread to sleep until some other thread wakes it up
pub struct ThreadParker {
parked: AtomicI32,
Expand Down Expand Up @@ -83,7 +55,7 @@ impl super::ThreadParkerT for ThreadParker {
#[inline]
unsafe fn park_until(&self, timeout: Instant) -> bool {
while self.parked.load(Ordering::Acquire) == PARKED {
if let Some(left) = checked_duration_since_now(timeout) {
if let Some(left) = timeout.checked_duration_since(Instant::now()) {
let nanos_left = i64::try_from(left.as_nanos()).unwrap_or(i64::max_value());
let r = wasm32::memory_atomic_wait32(self.ptr(), PARKED, nanos_left);
debug_assert!(r == 0 || r == 1 || r == 2);
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/windows/keyed_event.rs
Expand Up @@ -9,8 +9,8 @@ use core::{
mem::{self, MaybeUninit},
ptr,
};
use instant::Instant;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use winapi::{
shared::{
minwindef::{TRUE, ULONG},
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/windows/mod.rs
Expand Up @@ -9,7 +9,7 @@ use core::{
ptr,
sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
};
use instant::Instant;
use std::time::Instant;

mod keyed_event;
mod waitaddress;
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/windows/waitaddress.rs
Expand Up @@ -9,7 +9,7 @@ use core::{
mem,
sync::atomic::{AtomicUsize, Ordering},
};
use instant::Instant;
use std::time::Instant;
use winapi::{
shared::{
basetsd::SIZE_T,
Expand Down
6 changes: 3 additions & 3 deletions lock_api/src/mutex.rs
Expand Up @@ -663,8 +663,8 @@ impl<'a, R: RawMutex + 'a, T: fmt::Display + ?Sized + 'a> fmt::Display for Mutex
unsafe impl<'a, R: RawMutex + 'a, T: ?Sized + 'a> StableAddress for MutexGuard<'a, R, T> {}

/// An RAII mutex guard returned by the `Arc` locking operations on `Mutex`.
///
/// This is similar to the `MutexGuard` struct, except instead of using a reference to unlock the `Mutex` it
///
/// This is similar to the `MutexGuard` struct, except instead of using a reference to unlock the `Mutex` it
/// uses an `Arc<Mutex>`. This has several advantages, most notably that it has an `'static` lifetime.
#[cfg(feature = "arc_lock")]
#[must_use = "if unused the Mutex will immediately unlock"]
Expand Down Expand Up @@ -713,7 +713,7 @@ impl<R: RawMutexFair, T: ?Sized> ArcMutexGuard<R, T> {

// SAFETY: make sure the Arc gets it reference decremented
let mut s = ManuallyDrop::new(s);
unsafe { ptr::drop_in_place(&mut s.mutex) };
unsafe { ptr::drop_in_place(&mut s.mutex) };
}

/// Temporarily unlocks the mutex to execute the given function.
Expand Down

0 comments on commit 1cf1274

Please sign in to comment.