Skip to content

Commit

Permalink
Change critical-section implementation to not use spin-lock.
Browse files Browse the repository at this point in the history
  • Loading branch information
reitermarkus committed Sep 7, 2022
1 parent 3435d24 commit 43be119
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 83 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Expand Up @@ -88,3 +88,6 @@ required-features = ["std"]

[package.metadata.docs.rs]
all-features = true

[patch.crates-io]
critical-section = { git = "https://github.com/reitermarkus/critical-section", branch = "lock-poisoning" }
93 changes: 17 additions & 76 deletions src/imp_cs.rs
@@ -1,23 +1,13 @@
#[cfg(feature = "atomic-polyfill")]
use atomic_polyfill as atomic;
#[cfg(not(feature = "atomic-polyfill"))]
use core::sync::atomic;

use atomic::{AtomicU8, Ordering};

use core::panic::{RefUnwindSafe, UnwindSafe};

use critical_section::{CriticalSection, Mutex};

use crate::unsync::OnceCell as UnsyncOnceCell;

pub(crate) struct OnceCell<T> {
state: AtomicU8,
value: UnsyncOnceCell<T>,
value: Mutex<UnsyncOnceCell<T>>,
}

const INCOMPLETE: u8 = 0;
const RUNNING: u8 = 1;
const COMPLETE: u8 = 2;

// Why do we need `T: Send`?
// Thread A creates a `OnceCell` and shares it with
// scoped thread B, which fills the cell, which is
Expand All @@ -31,47 +21,29 @@ impl<T: UnwindSafe> UnwindSafe for OnceCell<T> {}

impl<T> OnceCell<T> {
pub(crate) const fn new() -> OnceCell<T> {
OnceCell { state: AtomicU8::new(INCOMPLETE), value: UnsyncOnceCell::new() }
OnceCell { value: Mutex::new(UnsyncOnceCell::new()) }
}

pub(crate) const fn with_value(value: T) -> OnceCell<T> {
OnceCell { state: AtomicU8::new(COMPLETE), value: UnsyncOnceCell::with_value(value) }
OnceCell { value: Mutex::new(UnsyncOnceCell::with_value(value)) }
}

#[inline]
pub(crate) fn is_initialized(&self) -> bool {
self.state.load(Ordering::Acquire) == COMPLETE
// The only write access is synchronized in `initialize` with a
// critical section, so read-only access is valid everywhere else.
unsafe { self.value.borrow(CriticalSection::new()).get().is_some() }
}

#[cold]
pub(crate) fn initialize<F, E>(&self, f: F) -> Result<(), E>
where
F: FnOnce() -> Result<T, E>,
{
let mut f = Some(f);
let mut res: Result<(), E> = Ok(());
let slot: &UnsyncOnceCell<T> = &self.value;
initialize_inner(&self.state, &mut || {
let f = unsafe { crate::take_unchecked(&mut f) };
match f() {
Ok(value) => {
unsafe { crate::unwrap_unchecked(slot.set(value).ok()) };
true
}
Err(err) => {
res = Err(err);
false
}
}
});
res
}

#[cold]
pub(crate) fn wait(&self) {
while !self.is_initialized() {
core::hint::spin_loop();
}
critical_section::with(|cs| {
let cell = self.value.borrow(cs);
cell.get_or_try_init(f).map(|_| ())
})
}

/// Get the reference to the underlying value, without checking if the cell
Expand All @@ -83,49 +55,18 @@ impl<T> OnceCell<T> {
/// the contents are acquired by (synchronized to) this thread.
pub(crate) unsafe fn get_unchecked(&self) -> &T {
debug_assert!(self.is_initialized());
self.value.get_unchecked()
// The only write access is synchronized in `initialize` with a
// critical section, so read-only access is valid everywhere else.
self.value.borrow(CriticalSection::new()).get_unchecked()
}

#[inline]
pub(crate) fn get_mut(&mut self) -> Option<&mut T> {
self.value.get_mut()
self.value.get_mut().get_mut()
}

#[inline]
pub(crate) fn into_inner(self) -> Option<T> {
self.value.into_inner()
}
}

struct Guard<'a> {
state: &'a AtomicU8,
new_state: u8,
}

impl<'a> Drop for Guard<'a> {
fn drop(&mut self) {
self.state.store(self.new_state, Ordering::Release);
}
}

#[inline(never)]
fn initialize_inner(state: &AtomicU8, init: &mut dyn FnMut() -> bool) {
loop {
match state.compare_exchange_weak(INCOMPLETE, RUNNING, Ordering::Acquire, Ordering::Acquire)
{
Ok(_) => {
let mut guard = Guard { state, new_state: INCOMPLETE };
if init() {
guard.new_state = COMPLETE;
}
return;
}
Err(COMPLETE) => return,
Err(RUNNING) | Err(INCOMPLETE) => core::hint::spin_loop(),
Err(_) => {
debug_assert!(false);
unsafe { core::hint::unreachable_unchecked() }
}
}
self.value.into_inner().into_inner()
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -959,6 +959,7 @@ pub mod sync {
/// assert_eq!(*value, 92);
/// t.join().unwrap();
/// ```
#[cfg(not(feature = "critical-section"))]
pub fn wait(&self) -> &T {
if !self.0.is_initialized() {
self.0.wait()
Expand Down
42 changes: 35 additions & 7 deletions tests/it.rs
Expand Up @@ -207,7 +207,7 @@ mod unsync {
}

#[test]
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(feature = "critical-section")))]
fn lazy_poisoning() {
let x: Lazy<String> = Lazy::new(|| panic!("kaboom"));
for _ in 0..2 {
Expand Down Expand Up @@ -249,10 +249,13 @@ mod unsync {
}
}

#[cfg(feature = "std")]
#[cfg(feature = "sync")]
mod sync {
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};

#[cfg(feature = "critical-section")]
use core::cell::Cell;

use crossbeam_utils::thread::scope;

use once_cell::sync::{Lazy, OnceCell};
Expand Down Expand Up @@ -354,6 +357,7 @@ mod sync {
assert_eq!(cell.get(), Some(&"hello".to_string()));
}

#[cfg(not(feature = "critical-section"))]
#[test]
fn wait() {
let cell: OnceCell<String> = OnceCell::new();
Expand All @@ -365,6 +369,7 @@ mod sync {
.unwrap();
}

#[cfg(not(feature = "critical-section"))]
#[test]
fn get_or_init_stress() {
use std::sync::Barrier;
Expand Down Expand Up @@ -430,6 +435,7 @@ mod sync {

#[test]
#[cfg_attr(miri, ignore)] // miri doesn't support processes
#[cfg(not(feature = "critical-section"))]
fn reentrant_init() {
let examples_dir = {
let mut exe = std::env::current_exe().unwrap();
Expand Down Expand Up @@ -457,6 +463,20 @@ mod sync {
}
}

#[cfg(feature = "critical-section")]
#[test]
#[should_panic(expected = "reentrant init")]
fn reentrant_init() {
let x: OnceCell<Box<i32>> = OnceCell::new();
let dangling_ref: Cell<Option<&i32>> = Cell::new(None);
x.get_or_init(|| {
let r = x.get_or_init(|| Box::new(92));
dangling_ref.set(Some(r));
Box::new(62)
});
eprintln!("use after free: {:?}", dangling_ref.get().unwrap());
}

#[test]
fn lazy_new() {
let called = AtomicUsize::new(0);
Expand Down Expand Up @@ -610,6 +630,7 @@ mod sync {
assert_eq!(fib[5], 8)
}

#[cfg(not(feature = "critical-section"))]
#[test]
fn once_cell_does_not_leak_partially_constructed_boxes() {
let n_tries = if cfg!(miri) { 10 } else { 100 };
Expand All @@ -636,6 +657,7 @@ mod sync {
}
}

#[cfg(not(feature = "critical-section"))]
#[test]
fn get_does_not_block() {
use std::sync::Barrier;
Expand Down Expand Up @@ -671,12 +693,11 @@ mod sync {

#[cfg(feature = "race")]
mod race {
#[cfg(not(feature = "critical-section"))]
use std::sync::Barrier;
use std::{
num::NonZeroUsize,
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Barrier,
},
sync::atomic::{AtomicUsize, Ordering::SeqCst},
};

use crossbeam_utils::thread::scope;
Expand Down Expand Up @@ -728,6 +749,7 @@ mod race {
assert_eq!(cell.get(), Some(val1));
}

#[cfg(not(feature = "critical-section"))]
#[test]
fn once_non_zero_usize_first_wins() {
let val1 = NonZeroUsize::new(92).unwrap();
Expand Down Expand Up @@ -807,12 +829,16 @@ mod race {

#[cfg(all(feature = "race", feature = "alloc"))]
mod race_once_box {
#[cfg(not(feature = "critical-section"))]
use std::sync::Barrier;
use std::sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Arc, Barrier,
Arc,
};

#[cfg(not(feature = "critical-section"))]
use crossbeam_utils::thread::scope;

use once_cell::race::OnceBox;

#[derive(Default)]
Expand Down Expand Up @@ -842,6 +868,7 @@ mod race_once_box {
}
}

#[cfg(not(feature = "critical-section"))]
#[test]
fn once_box_smoke_test() {
let heap = Heap::default();
Expand Down Expand Up @@ -896,6 +923,7 @@ mod race_once_box {
assert_eq!(heap.total(), 0);
}

#[cfg(not(feature = "critical-section"))]
#[test]
fn once_box_first_wins() {
let cell = OnceBox::new();
Expand Down

0 comments on commit 43be119

Please sign in to comment.