Skip to content

Commit

Permalink
util/epoch: Add preliminary support for loom
Browse files Browse the repository at this point in the history
This patch only adds support to parts of `utils` and to `epoch`. Some
parts of `utils` had to be left out, since they rely on
`AtomicUsize::new` being `const` (which it is not in `loom`). Other
parts had to be left out due to the lack of `thread::Thread` in `loom`.
All the parts needed for `epoch` were successfully moved to loom.

For this initial patch, there are two loom tests, both in `epoch`. One
is a simple test of defer_destroy while a pin is held, and the other is
the Triber stack example. They both pass loom with
`LOOM_MAX_PREEMPTIONS=3` and `LOOM_MAX_PREEMPTIONS=2`. The latter tests
fewer possible interleavings, but completes in 13 minutes on my laptop
rather than ~2 hours. I have added loom testing of `epoch` to CI as
well.

Note that the uses of `UnsafeCell` in `utils` have not been moved to
`loom::cell::UnsafeCell`, as loom's `UnsafeCell` does not support `T:
?Sized`, which `AtomicCell` depends on.

Fixes #486.
  • Loading branch information
jonhoo committed May 23, 2020
1 parent 33b9232 commit 0101676
Show file tree
Hide file tree
Showing 24 changed files with 400 additions and 39 deletions.
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Expand Up @@ -60,3 +60,14 @@ jobs:
run: rustup update stable && rustup default stable
- name: rustfmt
run: ./ci/rustfmt.sh

# Run loom tests.
loom:
name: loom
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@master
- name: Install Rust
run: rustup update stable && rustup default stable
- name: loom
run: ./ci/crossbeam-epoch-loom.sh
8 changes: 8 additions & 0 deletions ci/crossbeam-epoch-loom.sh
@@ -0,0 +1,8 @@
#!/bin/bash

cd "$(dirname "$0")"/../crossbeam-epoch
set -ex

export RUSTFLAGS="-D warnings --cfg=loom"

env LOOM_MAX_PREEMPTIONS=2 cargo test --test loom --features sanitize --release -- --nocapture
3 changes: 3 additions & 0 deletions crossbeam-epoch/Cargo.toml
Expand Up @@ -40,6 +40,9 @@ sanitize = [] # Makes it more likely to trigger any potential data races.
cfg-if = "0.1.10"
memoffset = "0.5.1"

[target.'cfg(loom)'.dependencies]
loom = "0.3.2"

[dependencies.crossbeam-utils]
version = "0.7"
path = "../crossbeam-utils"
Expand Down
32 changes: 29 additions & 3 deletions crossbeam-epoch/src/atomic.rs
@@ -1,3 +1,4 @@
use crate::concurrency::sync::atomic::AtomicUsize;
use alloc::boxed::Box;
use core::borrow::{Borrow, BorrowMut};
use core::cmp;
Expand All @@ -6,7 +7,7 @@ use core::marker::PhantomData;
use core::mem;
use core::ops::{Deref, DerefMut};
use core::ptr;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::sync::atomic::Ordering;

use crate::guard::Guard;
use crossbeam_utils::atomic::AtomicConsume;
Expand Down Expand Up @@ -150,6 +151,24 @@ impl<T> Atomic<T> {
///
/// let a = Atomic::<i32>::null();
/// ```
#[cfg(loom)]
pub fn null() -> Atomic<T> {
Self {
data: AtomicUsize::new(0),
_marker: PhantomData,
}
}

/// Returns a new null atomic pointer.
///
/// # Examples
///
/// ```
/// use crossbeam_epoch::Atomic;
///
/// let a = Atomic::<i32>::null();
/// ```
#[cfg(not(loom))]
pub const fn null() -> Atomic<T> {
Self {
data: AtomicUsize::new(0),
Expand Down Expand Up @@ -488,7 +507,14 @@ impl<T> Atomic<T> {
/// }
/// ```
pub unsafe fn into_owned(self) -> Owned<T> {
Owned::from_usize(self.data.into_inner())
#[cfg(loom)]
{
Owned::from_usize(self.data.unsync_load())
}
#[cfg(not(loom))]
{
Owned::from_usize(self.data.into_inner())
}
}
}

Expand Down Expand Up @@ -1167,7 +1193,7 @@ impl<T> Default for Shared<'_, T> {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use super::Shared;

Expand Down
10 changes: 5 additions & 5 deletions crossbeam-epoch/src/collector.rs
Expand Up @@ -12,7 +12,7 @@
///
/// handle.pin().flush();
/// ```
use alloc::sync::Arc;
use crate::concurrency::sync::Arc;
use core::fmt;

use crate::guard::Guard;
Expand Down Expand Up @@ -103,7 +103,7 @@ impl fmt::Debug for LocalHandle {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -145,9 +145,9 @@ mod tests {
let a = Owned::new(7).into_shared(guard);
guard.defer_destroy(a);

assert!(!(*(*guard.local).bag.get()).is_empty());
assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));

while !(*(*guard.local).bag.get()).is_empty() {
while !(*guard.local).bag.with(|b| (*b).is_empty()) {
guard.flush();
}
}
Expand All @@ -166,7 +166,7 @@ mod tests {
let a = Owned::new(7).into_shared(guard);
guard.defer_destroy(a);
}
assert!(!(*(*guard.local).bag.get()).is_empty());
assert!(!(*guard.local).bag.with(|b| (*b).is_empty()));
}
}

Expand Down
4 changes: 2 additions & 2 deletions crossbeam-epoch/src/default.rs
Expand Up @@ -5,8 +5,8 @@
//! destructed on thread exit, which in turn unregisters the thread.

use crate::collector::{Collector, LocalHandle};
use crate::concurrency::{lazy_static, thread_local};
use crate::guard::Guard;
use lazy_static::lazy_static;

lazy_static! {
/// The global data for the default garbage collector.
Expand Down Expand Up @@ -45,7 +45,7 @@ where
.unwrap_or_else(|_| f(&COLLECTOR.register()))
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use crossbeam_utils::thread;

Expand Down
2 changes: 1 addition & 1 deletion crossbeam-epoch/src/deferred.rs
Expand Up @@ -76,7 +76,7 @@ impl Deferred {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use super::Deferred;
use std::cell::Cell;
Expand Down
3 changes: 2 additions & 1 deletion crossbeam-epoch/src/epoch.rs
Expand Up @@ -7,7 +7,8 @@
//! If an object became garbage in some epoch, then we can be sure that after two advancements no
//! participant will hold a reference to it. That is the crux of safe memory reclamation.

use core::sync::atomic::{AtomicUsize, Ordering};
use crate::concurrency::sync::atomic::AtomicUsize;
use core::sync::atomic::Ordering;

/// An epoch that can be marked as pinned or unpinned.
///
Expand Down
18 changes: 10 additions & 8 deletions crossbeam-epoch/src/internal.rs
Expand Up @@ -35,10 +35,11 @@
//! Ideally each instance of concurrent data structure may have its own queue that gets fully
//! destroyed as soon as the data structure gets dropped.

use core::cell::{Cell, UnsafeCell};
use crate::concurrency::cell::UnsafeCell;
use crate::concurrency::sync::atomic;
use core::cell::Cell;
use core::mem::{self, ManuallyDrop};
use core::num::Wrapping;
use core::sync::atomic;
use core::sync::atomic::Ordering;
use core::{fmt, ptr};

Expand Down Expand Up @@ -408,7 +409,7 @@ impl Local {
/// Returns a reference to the `Collector` in which this `Local` resides.
#[inline]
pub fn collector(&self) -> &Collector {
unsafe { &**self.collector.get() }
self.collector.with(|c| unsafe { &**c })
}

/// Returns `true` if the current participant is pinned.
Expand All @@ -423,7 +424,7 @@ impl Local {
///
/// It should be safe for another thread to execute the given function.
pub unsafe fn defer(&self, mut deferred: Deferred, guard: &Guard) {
let bag = &mut *self.bag.get();
let bag = self.bag.with_mut(|b| &mut *b);

while let Err(d) = bag.try_push(deferred) {
let epoch = self.epoch.load(Ordering::Relaxed).unpinned();
Expand All @@ -433,7 +434,7 @@ impl Local {
}

pub fn flush(&self, guard: &Guard) {
let bag = unsafe { &mut *self.bag.get() };
let bag = self.bag.with_mut(|b| unsafe { &mut *b });

if !bag.is_empty() {
let epoch = self.epoch.load(Ordering::Relaxed).unpinned();
Expand Down Expand Up @@ -582,7 +583,8 @@ impl Local {
// doesn't defer destruction on any new garbage.
let epoch = self.epoch.load(Ordering::Relaxed).unpinned();
let guard = &self.pin();
self.global().push_bag(&mut *self.bag.get(), epoch, guard);
self.global()
.push_bag(self.bag.with_mut(|b| &mut *b), epoch, guard);
}
// Revert the handle count back to zero.
self.handle_count.set(0);
Expand All @@ -591,7 +593,7 @@ impl Local {
// Take the reference to the `Global` out of this `Local`. Since we're not protected
// by a guard at this time, it's crucial that the reference is read before marking the
// `Local` as deleted.
let collector: Collector = ptr::read(&*(*self.collector.get()));
let collector: Collector = ptr::read(self.collector.with(|c| &*(*c)));

// Mark this node in the linked list as deleted.
self.entry.delete(&unprotected());
Expand Down Expand Up @@ -622,7 +624,7 @@ impl IsElement<Local> for Local {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};

Expand Down
79 changes: 79 additions & 0 deletions crossbeam-epoch/src/lib.rs
Expand Up @@ -60,6 +60,85 @@

use cfg_if::cfg_if;

#[cfg(loom)]
#[allow(unused_imports, dead_code)]
pub(crate) mod concurrency {
pub(crate) mod cell {
pub(crate) use loom::cell::UnsafeCell;
}
pub(crate) mod sync {
pub(crate) mod atomic {
use core::sync::atomic::Ordering;
pub(crate) use loom::sync::atomic::AtomicUsize;
pub(crate) fn fence(ord: Ordering) {
if let Ordering::Acquire = ord {
} else {
// FIXME: loom only supports acquire fences at the moment.
// https://github.com/tokio-rs/loom/issues/117
// let's at least not panic...
// this may generate some false positives (`SeqCst` is stronger than `Acquire`
// for example), and some false negatives (`Relaxed` is weaker than `Acquire`),
// but it's the best we can do for the time being.
}
loom::sync::atomic::fence(Ordering::Acquire)
}

// FIXME: loom does not support compiler_fence at the moment.
// https://github.com/tokio-rs/loom/issues/117
// we use fence as a stand-in for compiler_fence for the time being.
// this may miss some races since fence is stronger than compiler_fence,
// but it's the best we can do for the time being.
pub(crate) use self::fence as compiler_fence;
}
pub(crate) use loom::sync::Arc;
}
pub(crate) use loom::lazy_static;
pub(crate) use loom::thread_local;
}
#[cfg(not(loom))]
#[allow(unused_imports, dead_code)]
pub(crate) mod concurrency {
#[cfg(any(feature = "alloc", feature = "std"))]
pub(crate) mod cell {
#[derive(Debug)]
#[repr(transparent)]
pub(crate) struct UnsafeCell<T>(::core::cell::UnsafeCell<T>);

impl<T> UnsafeCell<T> {
#[inline]
pub(crate) fn new(data: T) -> UnsafeCell<T> {
UnsafeCell(::core::cell::UnsafeCell::new(data))
}

#[inline]
pub(crate) fn with<R>(&self, f: impl FnOnce(*const T) -> R) -> R {
f(self.0.get())
}

#[inline]
pub(crate) fn with_mut<R>(&self, f: impl FnOnce(*mut T) -> R) -> R {
f(self.0.get())
}
}
}
#[cfg(any(feature = "alloc", feature = "std"))]
pub(crate) mod sync {
pub(crate) mod atomic {
pub(crate) use core::sync::atomic::compiler_fence;
pub(crate) use core::sync::atomic::fence;
pub(crate) use core::sync::atomic::AtomicUsize;
}
#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
pub(crate) use alloc::sync::Arc;
}

#[cfg(feature = "std")]
pub(crate) use std::thread_local;

#[cfg(feature = "std")]
pub(crate) use lazy_static::lazy_static;
}

#[cfg_attr(feature = "nightly", cfg(target_has_atomic = "ptr"))]
cfg_if! {
if #[cfg(feature = "alloc")] {
Expand Down
2 changes: 1 addition & 1 deletion crossbeam-epoch/src/sync/list.rs
Expand Up @@ -295,7 +295,7 @@ impl<'g, T: 'g, C: IsElement<T>> Iterator for Iter<'g, T, C> {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod tests {
use super::*;
use crate::{Collector, Owned};
Expand Down
2 changes: 1 addition & 1 deletion crossbeam-epoch/src/sync/queue.rs
Expand Up @@ -202,7 +202,7 @@ impl<T> Drop for Queue<T> {
}
}

#[cfg(test)]
#[cfg(all(test, not(loom)))]
mod test {
use super::*;
use crossbeam_utils::thread;
Expand Down

0 comments on commit 0101676

Please sign in to comment.