Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a loom implementation for event-listener #126

Merged
merged 1 commit into from
Mar 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,14 @@ jobs:
- uses: rustsec/audit-check@master
with:
token: ${{ secrets.GITHUB_TOKEN }}

loom:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- name: Loom tests
run: RUSTFLAGS="--cfg=loom" cargo test --release --test loom --features loom


6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ exclude = ["/.*"]
default = ["std"]
std = ["concurrent-queue/std", "parking"]
portable-atomic = ["portable-atomic-util", "portable_atomic_crate"]
loom = ["concurrent-queue/loom", "parking?/loom", "dep:loom"]

[dependencies]
concurrent-queue = { version = "2.2.0", default-features = false }
concurrent-queue = { version = "2.4.0", default-features = false }
pin-project-lite = "0.2.12"
portable-atomic-util = { version = "0.1.4", default-features = false, optional = true, features = ["alloc"] }

[target.'cfg(not(target_family = "wasm"))'.dependencies]
parking = { version = "2.0.0", optional = true }

[target.'cfg(loom)'.dependencies]
loom = { version = "0.7", optional = true }

[dependencies.portable_atomic_crate]
package = "portable-atomic"
version = "1.2.0"
Expand Down
79 changes: 74 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ use {
};

use sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use sync::{Arc, WithMut};
use sync::Arc;

#[cfg(not(loom))]
use sync::WithMut;

use notify::{Internal, NotificationPrivate};
pub use notify::{IntoNotification, Notification};
Expand Down Expand Up @@ -216,13 +219,20 @@ impl<T> Event<T> {
///
/// let event = Event::<usize>::with_tag();
/// ```
#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(loom)))]
#[inline]
pub const fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}
#[cfg(all(feature = "std", loom))]
#[inline]
pub fn with_tag() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}

/// Tell whether any listeners are currently notified.
///
Expand Down Expand Up @@ -543,12 +553,21 @@ impl Event<()> {
/// let event = Event::new();
/// ```
#[inline]
#[cfg(not(loom))]
pub const fn new() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}

#[inline]
#[cfg(loom)]
pub fn new() -> Self {
Self {
inner: AtomicPtr::new(ptr::null_mut()),
}
}

/// Notifies a number of active listeners without emitting a `SeqCst` fence.
///
/// The number is allowed to be zero or exceed the current number of listeners.
Expand Down Expand Up @@ -1119,6 +1138,12 @@ impl<T, B: Borrow<Inner<T>> + Unpin> InnerListener<T, B> {
match deadline {
None => parker.park(),

#[cfg(loom)]
Some(_deadline) => {
panic!("parking does not support timeouts under loom");
}

#[cfg(not(loom))]
Some(deadline) => {
// Make sure we're not timed out already.
let now = Instant::now();
Expand Down Expand Up @@ -1330,10 +1355,9 @@ const NEVER_INSERTED_PANIC: &str = "\
EventListener was not inserted into the linked list, make sure you're not polling \
EventListener/listener! after it has finished";

#[cfg(not(loom))]
/// Synchronization primitive implementation.
mod sync {
pub(super) use core::cell;

#[cfg(not(feature = "portable-atomic"))]
pub(super) use alloc::sync::Arc;
#[cfg(not(feature = "portable-atomic"))]
Expand All @@ -1344,7 +1368,7 @@ mod sync {
#[cfg(feature = "portable-atomic")]
pub(super) use portable_atomic_util::Arc;

#[cfg(feature = "std")]
#[cfg(all(feature = "std", not(loom)))]
pub(super) use std::sync::{Mutex, MutexGuard};

pub(super) trait WithMut {
Expand All @@ -1366,6 +1390,51 @@ mod sync {
f(self.get_mut())
}
}

pub(crate) mod cell {
pub(crate) use core::cell::Cell;

/// This newtype around *mut T exists for interoperability with loom::cell::ConstPtr,
/// which works as a guard and performs additional logic to track access scope.
pub(crate) struct ConstPtr<T>(*mut T);
impl<T> ConstPtr<T> {
pub(crate) unsafe fn deref(&self) -> &T {
&*self.0
}

#[allow(unused)] // std code does not need this
pub(crate) unsafe fn deref_mut(&mut self) -> &mut T {
&mut *self.0
}
}

/// This UnsafeCell wrapper exists for interoperability with loom::cell::UnsafeCell, and
/// only contains the interface that is needed for this crate.
#[derive(Debug, Default)]
pub(crate) struct UnsafeCell<T>(core::cell::UnsafeCell<T>);

impl<T> UnsafeCell<T> {
pub(crate) fn new(data: T) -> UnsafeCell<T> {
UnsafeCell(core::cell::UnsafeCell::new(data))
}

pub(crate) fn get(&self) -> ConstPtr<T> {
ConstPtr(self.0.get())
}

#[allow(dead_code)] // no_std does not need this
pub(crate) fn into_inner(self) -> T {
self.0.into_inner()
}
}
}
}

#[cfg(loom)]
/// Synchronization primitive implementation.
mod sync {
pub(super) use loom::cell;
pub(super) use loom::sync::{atomic, Arc, Mutex, MutexGuard};
}

fn __test_send_and_sync() {
Expand Down
17 changes: 12 additions & 5 deletions src/no_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use node::{Node, NothingProducer, TaskWaiting};

use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::cell::{Cell, UnsafeCell};
use crate::sync::cell::{Cell, ConstPtr, UnsafeCell};
use crate::sync::Arc;
use crate::{RegisterResult, State, Task, TaskRef};

Expand Down Expand Up @@ -771,7 +771,10 @@ impl<T> Mutex<T> {
.is_ok()
{
// We have successfully locked the mutex.
Some(MutexGuard { mutex: self })
Some(MutexGuard {
mutex: self,
guard: self.value.get(),
})
} else {
self.try_lock_slow()
}
Expand All @@ -790,7 +793,10 @@ impl<T> Mutex<T> {
.is_ok()
{
// We have successfully locked the mutex.
return Some(MutexGuard { mutex: self });
return Some(MutexGuard {
mutex: self,
guard: self.value.get(),
});
}

// Use atomic loads instead of compare-exchange.
Expand All @@ -804,6 +810,7 @@ impl<T> Mutex<T> {

pub(crate) struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
guard: ConstPtr<T>,
}

impl<'a, T> Drop for MutexGuard<'a, T> {
Expand All @@ -816,13 +823,13 @@ impl<'a, T> ops::Deref for MutexGuard<'a, T> {
type Target = T;

fn deref(&self) -> &T {
unsafe { &*self.mutex.value.get() }
unsafe { self.guard.deref() }
}
}

impl<'a, T> ops::DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.mutex.value.get() }
unsafe { self.guard.deref_mut() }
}
}

Expand Down
69 changes: 33 additions & 36 deletions src/std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,27 +73,27 @@ impl<T> crate::Inner<T> {
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
let mut inner = self.lock();

// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe {
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

// Get the inner pointer.
&*listener.link.get()
};

// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
listener.as_mut().set(Some(Listener {
link: UnsafeCell::new(Link {
state: Cell::new(State::Created),
prev: Cell::new(inner.tail),
next: Cell::new(None),
}),
_pin: PhantomPinned,
}));
let listener = listener.as_pin_mut().unwrap();

{
let entry_guard = listener.link.get();
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Replace the tail with the new entry.
match mem::replace(&mut inner.tail, Some(entry.into())) {
None => inner.head = Some(entry.into()),
Some(t) => unsafe { t.as_ref().next.set(Some(entry.into())) },
};
}

// If there are no unnotified entries, this is the first one.
if inner.next.is_none() {
Expand Down Expand Up @@ -129,15 +129,12 @@ impl<T> crate::Inner<T> {
task: TaskRef<'_>,
) -> RegisterResult<T> {
let mut inner = self.lock();

// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe {
let listener = match listener.as_mut().as_pin_mut() {
Some(listener) => listener,
None => return RegisterResult::NeverInserted,
};
&*listener.link.get()
let entry_guard = match listener.as_mut().as_pin_mut() {
Some(listener) => listener.link.get(),
None => return RegisterResult::NeverInserted,
};
// SAFETY: We are locked, so we can access the inner `link`.
let entry = unsafe { entry_guard.deref() };

// Take out the state and check it.
match entry.state.replace(State::NotifiedTaken) {
Expand Down Expand Up @@ -175,12 +172,8 @@ impl<T> Inner<T> {
mut listener: Pin<&mut Option<Listener<T>>>,
propagate: bool,
) -> Option<State<T>> {
let entry = unsafe {
let listener = listener.as_mut().as_pin_mut()?;

// Get the inner pointer.
&*listener.link.get()
};
let entry_guard = listener.as_mut().as_pin_mut()?.link.get();
let entry = unsafe { entry_guard.deref() };

let prev = entry.prev.get();
let next = entry.next.get();
Expand Down Expand Up @@ -216,7 +209,11 @@ impl<T> Inner<T> {
.into_inner()
};

let mut state = entry.state.into_inner();
// This State::Created is immediately dropped and exists as a workaround for the absence of
// loom::cell::Cell::into_inner. The intent is `let mut state = entry.state.into_inner();`
//
// refs: https://github.com/tokio-rs/loom/pull/341
let mut state = entry.state.replace(State::Created);

// Update the notified count.
if state.is_notified() {
Expand Down