Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable eventual fairness on wasm32-unknown-unknown #302

Merged
merged 2 commits into from Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.toml
Expand Up @@ -13,7 +13,6 @@ edition = "2018"
[dependencies]
parking_lot_core = { path = "core", version = "0.8.4" }
lock_api = { path = "lock_api", version = "0.4.5" }
instant = "0.1.9"

[dev-dependencies]
rand = "0.8.3"
Expand All @@ -28,8 +27,6 @@ owning_ref = ["lock_api/owning_ref"]
nightly = ["parking_lot_core/nightly", "lock_api/nightly"]
deadlock_detection = ["parking_lot_core/deadlock_detection"]
serde = ["lock_api/serde"]
stdweb = ["instant/stdweb"]
wasm-bindgen = ["instant/wasm-bindgen"]
send_guard = []

[workspace]
Expand Down
1 change: 0 additions & 1 deletion core/Cargo.toml
Expand Up @@ -15,7 +15,6 @@ smallvec = "1.6.1"
petgraph = { version = "0.5.1", optional = true }
thread-id = { version = "4.0.0", optional = true }
backtrace = { version = "0.3.60", optional = true }
instant = "0.1.9"

[target.'cfg(unix)'.dependencies]
libc = "0.2.95"
Expand Down
2 changes: 1 addition & 1 deletion core/src/lib.rs
Expand Up @@ -46,7 +46,7 @@
#![cfg_attr(
all(
feature = "nightly",
target_arch = "wasm32",
target_family = "wasm",
target_feature = "atomics"
),
feature(stdsimd)
Expand Down
38 changes: 31 additions & 7 deletions core/src/parking_lot.rs
Expand Up @@ -12,9 +12,33 @@ use core::{
ptr,
sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
};
use instant::Instant;
use smallvec::SmallVec;
use std::time::Duration;
use std::time::{Duration, Instant};

// Don't use Instant on wasm32-unknown-unknown, it just panics.
cfg_if::cfg_if! {
if #[cfg(all(
target_family = "wasm",
target_os = "unknown",
target_vendor = "unknown"
))] {
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
struct TimeoutInstant;
impl TimeoutInstant {
fn now() -> TimeoutInstant {
TimeoutInstant
}
}
impl core::ops::Add<Duration> for TimeoutInstant {
type Output = Self;
fn add(self, _rhs: Duration) -> Self::Output {
TimeoutInstant
}
}
} else {
use std::time::Instant as TimeoutInstant;
}
}

static NUM_THREADS: AtomicUsize = AtomicUsize::new(0);

Expand Down Expand Up @@ -47,7 +71,7 @@ impl HashTable {
let new_size = (num_threads * LOAD_FACTOR).next_power_of_two();
let hash_bits = 0usize.leading_zeros() - new_size.leading_zeros() - 1;

let now = Instant::now();
let now = TimeoutInstant::now();
let mut entries = Vec::with_capacity(new_size);
for i in 0..new_size {
// We must ensure the seed is not zero
Expand Down Expand Up @@ -77,7 +101,7 @@ struct Bucket {

impl Bucket {
#[inline]
pub fn new(timeout: Instant, seed: u32) -> Self {
pub fn new(timeout: TimeoutInstant, seed: u32) -> Self {
Self {
mutex: WordLock::new(),
queue_head: Cell::new(ptr::null()),
Expand All @@ -89,22 +113,22 @@ impl Bucket {

struct FairTimeout {
// Next time at which point be_fair should be set
timeout: Instant,
timeout: TimeoutInstant,

// the PRNG state for calculating the next timeout
seed: u32,
}

impl FairTimeout {
#[inline]
fn new(timeout: Instant, seed: u32) -> FairTimeout {
fn new(timeout: TimeoutInstant, seed: u32) -> FairTimeout {
FairTimeout { timeout, seed }
}

// Determine whether we should force a fair unlock, and update the timeout
#[inline]
fn should_timeout(&mut self) -> bool {
let now = Instant::now();
let now = TimeoutInstant::now();
if now > self.timeout {
// Time between 0 and 1ms.
let nanos = self.gen_u32() % 1_000_000;
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/generic.rs
Expand Up @@ -9,8 +9,8 @@
//! parking facilities available.

use core::sync::atomic::{spin_loop_hint, AtomicBool, Ordering};
use instant::Instant;
use std::thread;
use std::time::Instant;

// Helper type for putting a thread to sleep until some other thread wakes it up
pub struct ThreadParker {
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/linux.rs
Expand Up @@ -9,9 +9,9 @@ use core::{
ptr,
sync::atomic::{AtomicI32, Ordering},
};
use instant::Instant;
use libc;
use std::thread;
use std::time::Instant;

// x32 Linux uses a non-standard type for tv_nsec in timespec.
// See https://sourceware.org/bugzilla/show_bug.cgi?id=16437
Expand Down
6 changes: 3 additions & 3 deletions core/src/thread_parker/mod.rs
@@ -1,5 +1,5 @@
use cfg_if::cfg_if;
use instant::Instant;
use std::time::Instant;

/// Trait for the platform thread parker implementation.
///
Expand Down Expand Up @@ -68,12 +68,12 @@ cfg_if! {
mod imp;
} else if #[cfg(all(
feature = "nightly",
target_arch = "wasm32",
target_family = "wasm",
target_feature = "atomics"
))] {
#[path = "wasm_atomic.rs"]
mod imp;
} else if #[cfg(target_arch = "wasm32")] {
} else if #[cfg(target_family = "wasm")] {
#[path = "wasm.rs"]
mod imp;
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/redox.rs
Expand Up @@ -9,8 +9,8 @@ use core::{
ptr,
sync::atomic::{AtomicI32, Ordering},
};
use instant::Instant;
use std::thread;
use std::time::Instant;
use syscall::{
call::futex,
data::TimeSpec,
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/sgx.rs
Expand Up @@ -6,7 +6,7 @@
// copied, modified, or distributed except according to those terms.

use core::sync::atomic::{AtomicBool, Ordering};
use instant::Instant;
use std::time::Instant;
use std::{
io,
os::fortanix_sgx::{
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/unix.rs
Expand Up @@ -11,8 +11,8 @@ use core::{
cell::{Cell, UnsafeCell},
mem::MaybeUninit,
};
use instant::Instant;
use libc;
use std::time::Instant;
use std::{thread, time::Duration};

// x32 Linux uses a non-standard type for tv_nsec in timespec.
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/wasm.rs
Expand Up @@ -8,8 +8,8 @@
//! The wasm platform can't park when atomic support is not available.
//! So this ThreadParker just panics on any attempt to park.

use instant::Instant;
use std::thread;
use std::time::Instant;

pub struct ThreadParker(());

Expand Down
32 changes: 2 additions & 30 deletions core/src/thread_parker/wasm_atomic.rs
Expand Up @@ -5,41 +5,13 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

use cfg_if::cfg_if;
use core::{
arch::wasm32,
sync::atomic::{AtomicI32, Ordering},
};
use instant::Instant;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::{convert::TryFrom, thread};

cfg_if! {
if #[cfg(all(
target_arch = "wasm32",
target_os = "unknown",
target_vendor = "unknown"
))] {
// This function serves as a polyfill for `Instant::checked_duration_since`, which is
// currently not implemented for wasm32-unknown-unknown.
// TODO: Remove this shim once it
fn checked_duration_since_now(other: Instant) -> Option<Duration> {
let now = Instant::now();

if other < now {
None
} else {
Some(other.duration_since(now))
}
}
} else {
// If we are not targeting wasm32, we can use the native `checked_duration_since`.
fn checked_duration_since_now(timeout: Instant) -> Option<Duration> {
timeout.checked_duration_since(Instant::now())
}
}
}

// Helper type for putting a thread to sleep until some other thread wakes it up
pub struct ThreadParker {
parked: AtomicI32,
Expand Down Expand Up @@ -83,7 +55,7 @@ impl super::ThreadParkerT for ThreadParker {
#[inline]
unsafe fn park_until(&self, timeout: Instant) -> bool {
while self.parked.load(Ordering::Acquire) == PARKED {
if let Some(left) = checked_duration_since_now(timeout) {
if let Some(left) = timeout.checked_duration_since(Instant::now()) {
let nanos_left = i64::try_from(left.as_nanos()).unwrap_or(i64::max_value());
let r = wasm32::memory_atomic_wait32(self.ptr(), PARKED, nanos_left);
debug_assert!(r == 0 || r == 1 || r == 2);
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/windows/keyed_event.rs
Expand Up @@ -9,8 +9,8 @@ use core::{
mem::{self, MaybeUninit},
ptr,
};
use instant::Instant;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use winapi::{
shared::{
minwindef::{TRUE, ULONG},
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/windows/mod.rs
Expand Up @@ -9,7 +9,7 @@ use core::{
ptr,
sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
};
use instant::Instant;
use std::time::Instant;

mod keyed_event;
mod waitaddress;
Expand Down
2 changes: 1 addition & 1 deletion core/src/thread_parker/windows/waitaddress.rs
Expand Up @@ -9,7 +9,7 @@ use core::{
mem,
sync::atomic::{AtomicUsize, Ordering},
};
use instant::Instant;
use std::time::Instant;
use winapi::{
shared::{
basetsd::SIZE_T,
Expand Down
6 changes: 3 additions & 3 deletions lock_api/src/mutex.rs
Expand Up @@ -663,8 +663,8 @@ impl<'a, R: RawMutex + 'a, T: fmt::Display + ?Sized + 'a> fmt::Display for Mutex
unsafe impl<'a, R: RawMutex + 'a, T: ?Sized + 'a> StableAddress for MutexGuard<'a, R, T> {}

/// An RAII mutex guard returned by the `Arc` locking operations on `Mutex`.
///
/// This is similar to the `MutexGuard` struct, except instead of using a reference to unlock the `Mutex` it
///
/// This is similar to the `MutexGuard` struct, except instead of using a reference to unlock the `Mutex` it
/// uses an `Arc<Mutex>`. This has several advantages, most notably that it has an `'static` lifetime.
#[cfg(feature = "arc_lock")]
#[must_use = "if unused the Mutex will immediately unlock"]
Expand Down Expand Up @@ -713,7 +713,7 @@ impl<R: RawMutexFair, T: ?Sized> ArcMutexGuard<R, T> {

// SAFETY: make sure the Arc gets it reference decremented
let mut s = ManuallyDrop::new(s);
unsafe { ptr::drop_in_place(&mut s.mutex) };
unsafe { ptr::drop_in_place(&mut s.mutex) };
}

/// Temporarily unlocks the mutex to execute the given function.
Expand Down