Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alternate implementations of synchronization primitives #27

Merged
merged 8 commits into from Oct 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/ci.yml
Expand Up @@ -29,8 +29,14 @@ jobs:
if: startsWith(matrix.rust, 'nightly')
run: cargo check -Z features=dev_dep
- run: cargo test
- run: cargo test --features portable-atomic
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- name: Run with Loom enabled
run: cargo test --test loom --features loom
env:
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg loom
LOOM_MAX_PREEMPTIONS: 2
- run: rustup target add thumbv7m-none-eabi
- run: cargo hack build --target thumbv7m-none-eabi --no-default-features --no-dev-deps

Expand All @@ -46,6 +52,7 @@ jobs:
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: cargo build
- run: cargo build --features portable-atomic
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- run: rustup target add thumbv7m-none-eabi
Expand Down
6 changes: 6 additions & 0 deletions Cargo.toml
Expand Up @@ -19,6 +19,12 @@ bench = false

[dependencies]
crossbeam-utils = { version = "0.8.11", default-features = false }
portable-atomic = { version = "0.3", default-features = false, optional = true }

# Enables loom testing. This feature is permanently unstable and the API may
# change at any time.
[target.'cfg(loom)'.dependencies]
loom = { version = "0.5", optional = true }

[[bench]]
name = "bench"
Expand Down
97 changes: 60 additions & 37 deletions src/bounded.rs
@@ -1,10 +1,12 @@
use alloc::{boxed::Box, vec::Vec};
use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicUsize, Ordering};

use crossbeam_utils::CachePadded;

use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, PopError, PushError};

/// A slot in a queue.
Expand Down Expand Up @@ -118,9 +120,9 @@ impl<T> Bounded<T> {
) {
Ok(_) => {
// Write the value into the slot and update the stamp.
unsafe {
slot.value.get().write(MaybeUninit::new(value));
}
slot.value.with_mut(|slot| unsafe {
slot.write(MaybeUninit::new(value));
});
slot.stamp.store(tail + 1, Ordering::Release);
return Ok(());
}
Expand All @@ -138,6 +140,10 @@ impl<T> Bounded<T> {
return Err(PushError::Full(value));
}

// Loom complains if there isn't an explicit busy wait here.
#[cfg(loom)]
busy_wait();

tail = self.tail.load(Ordering::Relaxed);
} else {
// Yield because we need to wait for the stamp to get updated.
Expand Down Expand Up @@ -181,7 +187,9 @@ impl<T> Bounded<T> {
) {
Ok(_) => {
// Read the value from the slot and update the stamp.
let value = unsafe { slot.value.get().read().assume_init() };
let value = slot
.value
.with_mut(|slot| unsafe { slot.read().assume_init() });
slot.stamp
.store(head.wrapping_add(self.one_lap), Ordering::Release);
return Ok(value);
Expand All @@ -204,6 +212,10 @@ impl<T> Bounded<T> {
}
}

// Loom complains if there isn't a busy-wait here.
#[cfg(loom)]
busy_wait();

head = self.head.load(Ordering::Relaxed);
} else {
// Yield because we need to wait for the stamp to get updated.
Expand Down Expand Up @@ -284,37 +296,48 @@ impl<T> Bounded<T> {
impl<T> Drop for Bounded<T> {
fn drop(&mut self) {
// Get the index of the head.
let head = *self.head.get_mut();
let tail = *self.tail.get_mut();

let hix = head & (self.mark_bit - 1);
let tix = tail & (self.mark_bit - 1);

let len = if hix < tix {
tix - hix
} else if hix > tix {
self.buffer.len() - hix + tix
} else if (tail & !self.mark_bit) == head {
0
} else {
self.buffer.len()
};

// Loop over all slots that hold a value and drop them.
for i in 0..len {
// Compute the index of the next slot holding a value.
let index = if hix + i < self.buffer.len() {
hix + i
} else {
hix + i - self.buffer.len()
};
let Self {
head,
tail,
buffer,
mark_bit,
..
} = self;

// Drop the value in the slot.
let slot = &self.buffer[index];
unsafe {
let value = &mut *slot.value.get();
value.as_mut_ptr().drop_in_place();
}
}
let mark_bit = *mark_bit;

head.with_mut(|&mut head| {
tail.with_mut(|&mut tail| {
let hix = head & (mark_bit - 1);
let tix = tail & (mark_bit - 1);

let len = if hix < tix {
tix - hix
} else if hix > tix {
buffer.len() - hix + tix
} else if (tail & !mark_bit) == head {
0
} else {
buffer.len()
};

// Loop over all slots that hold a value and drop them.
for i in 0..len {
// Compute the index of the next slot holding a value.
let index = if hix + i < buffer.len() {
hix + i
} else {
hix + i - buffer.len()
};

// Drop the value in the slot.
let slot = &buffer[index];
slot.value.with_mut(|slot| unsafe {
let value = &mut *slot;
value.as_mut_ptr().drop_in_place();
});
}
});
});
}
}
31 changes: 16 additions & 15 deletions src/lib.rs
Expand Up @@ -26,14 +26,23 @@
//!
//! # Features
//!
//! `concurrent-queue` used an `std` default feature. With this feature enabled, this crate will
//! `concurrent-queue` uses an `std` default feature. With this feature enabled, this crate will
//! use [`std::thread::yield_now`] to avoid busy waiting in tight loops. However, with this
//! feature disabled, [`core::hint::spin_loop`] will be used instead. Disabling `std` will allow
//! this crate to be used on `no_std` platforms at the potential expense of more busy waiting.
//!
//! There is also a `portable-atomic` feature, which uses a polyfill from the
//! [`portable-atomic`] crate to provide atomic operations on platforms that do not support them.
//! See the [`README`] for the [`portable-atomic`] crate for more information on how to use it on
//! single-threaded targets. Note that even with this feature enabled, `concurrent-queue` still
//! requires a global allocator to be available. See the documentation for the
//! [`std::alloc::GlobalAlloc`] trait for more information.
//!
//! [Bounded]: `ConcurrentQueue::bounded()`
//! [Unbounded]: `ConcurrentQueue::unbounded()`
//! [closed]: `ConcurrentQueue::close()`
//! [`portable-atomic`]: https://crates.io/crates/portable-atomic
//! [`README`]: https://github.com/taiki-e/portable-atomic/blob/main/README.md#optional-cfg

#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![no_std]
Expand All @@ -44,7 +53,7 @@ extern crate std;

use alloc::boxed::Box;
use core::fmt;
use core::sync::atomic::{self, AtomicUsize, Ordering};
use sync::atomic::{self, AtomicUsize, Ordering};

#[cfg(feature = "std")]
use std::error;
Expand All @@ -53,12 +62,15 @@ use std::panic::{RefUnwindSafe, UnwindSafe};

use crate::bounded::Bounded;
use crate::single::Single;
use crate::sync::busy_wait;
use crate::unbounded::Unbounded;

mod bounded;
mod single;
mod unbounded;

mod sync;

/// A concurrent queue.
///
/// # Examples
Expand Down Expand Up @@ -463,24 +475,13 @@ impl<T> fmt::Display for PushError<T> {
}
}

/// Notify the CPU that we are currently busy-waiting.
#[inline]
fn busy_wait() {
#[cfg(feature = "std")]
std::thread::yield_now();
// Use the deprecated `spin_loop_hint` here in order to
// avoid bumping the MSRV.
#[allow(deprecated)]
#[cfg(not(feature = "std"))]
core::sync::atomic::spin_loop_hint()
}

/// Equivalent to `atomic::fence(Ordering::SeqCst)`, but in some cases faster.
#[inline]
fn full_fence() {
if cfg!(all(
any(target_arch = "x86", target_arch = "x86_64"),
not(miri)
not(miri),
not(loom)
)) {
// HACK(stjepang): On x86 architectures there are two different ways of executing
// a `SeqCst` fence.
Expand Down
27 changes: 18 additions & 9 deletions src/single.rs
@@ -1,7 +1,9 @@
use core::cell::UnsafeCell;
use core::mem::MaybeUninit;
use core::sync::atomic::{AtomicUsize, Ordering};

use crate::sync::atomic::{AtomicUsize, Ordering};
use crate::sync::cell::UnsafeCell;
#[allow(unused_imports)]
use crate::sync::prelude::*;
use crate::{busy_wait, PopError, PushError};

const LOCKED: usize = 1 << 0;
Expand Down Expand Up @@ -33,7 +35,9 @@ impl<T> Single<T> {

if state == 0 {
// Write the value and unlock.
unsafe { self.slot.get().write(MaybeUninit::new(value)) }
self.slot.with_mut(|slot| unsafe {
slot.write(MaybeUninit::new(value));
});
self.state.fetch_and(!LOCKED, Ordering::Release);
Ok(())
} else if state & CLOSED != 0 {
Expand All @@ -60,7 +64,9 @@ impl<T> Single<T> {

if prev == state {
// Read the value and unlock.
let value = unsafe { self.slot.get().read().assume_init() };
let value = self
.slot
.with_mut(|slot| unsafe { slot.read().assume_init() });
self.state.fetch_and(!LOCKED, Ordering::Release);
return Ok(value);
}
Expand Down Expand Up @@ -118,11 +124,14 @@ impl<T> Single<T> {
impl<T> Drop for Single<T> {
fn drop(&mut self) {
// Drop the value in the slot.
if *self.state.get_mut() & PUSHED != 0 {
unsafe {
let value = &mut *self.slot.get();
value.as_mut_ptr().drop_in_place();
let Self { state, slot } = self;
state.with_mut(|state| {
if *state & PUSHED != 0 {
slot.with_mut(|slot| unsafe {
let value = &mut *slot;
value.as_mut_ptr().drop_in_place();
});
}
}
});
}
}