Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make concurrent-queue loom-aware #5

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ cache-padded = "1.1.1"
[dev-dependencies]
easy-parallel = "3.1.0"
fastrand = "1.3.3"

[target.'cfg(loom)'.dependencies]
loom = { version = "0.3.5" }
14 changes: 6 additions & 8 deletions src/bounded.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cell::UnsafeCell;
use crate::facade::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use crate::facade::sync::atomic::{AtomicUsize, Ordering};
use crate::facade::{thread, cell};

use cache_padded::CachePadded;

Expand Down Expand Up @@ -118,9 +118,7 @@ impl<T> Bounded<T> {
) {
Ok(_) => {
// Write the value into the slot and update the stamp.
unsafe {
slot.value.get().write(MaybeUninit::new(value));
}
unsafe { cell::write(&slot.value, MaybeUninit::new(value)); }
slot.stamp.store(tail + 1, Ordering::Release);
return Ok(());
}
Expand Down Expand Up @@ -181,7 +179,7 @@ impl<T> Bounded<T> {
) {
Ok(_) => {
// Read the value from the slot and update the stamp.
let value = unsafe { slot.value.get().read().assume_init() };
let value = unsafe { cell::read(&slot.value).assume_init() };
slot.stamp
.store(head.wrapping_add(self.one_lap), Ordering::Release);
return Ok(value);
Expand Down Expand Up @@ -298,7 +296,7 @@ impl<T> Drop for Bounded<T> {
// Drop the value in the slot.
let slot = &self.buffer[index];
unsafe {
let value = slot.value.get().read().assume_init();
let value = cell::read(&slot.value).assume_init();
drop(value);
}
}
Expand Down
65 changes: 64 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,75 @@
use std::error;
use std::fmt;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::sync::atomic::{self, AtomicUsize, Ordering};
use facade::sync::atomic::{self, AtomicUsize, Ordering};

use crate::bounded::Bounded;
use crate::single::Single;
use crate::unbounded::Unbounded;

/// An internal facade abstracting over loom and std types
mod facade {
pub mod sync {
#[cfg(loom)]
pub use loom::sync::*;
#[cfg(not(loom))]
pub use std::sync::*;

pub mod atomic {
#[cfg(loom)]
pub use loom::sync::atomic::*;

#[cfg(not(loom))]
pub use std::sync::atomic::*;

// TODO(loom): loom may add get_mut to &mut AtomicUsize in the future
pub fn load_unique(atomic: &mut AtomicUsize) -> usize {
#[cfg(loom)]
let v = unsafe { atomic.unsync_load() }; // SAFETY: we have &mut

#[cfg(not(loom))]
let v = *atomic.get_mut();

v
}
}
}

pub mod cell {
#[cfg(loom)]
pub use loom::cell::*;
#[cfg(not(loom))]
pub use std::cell::*;

// TODO(loom): loom may add get() to &mut AtomicUsize in the future
pub unsafe fn write<T>(cell: &UnsafeCell<T>, value: T) {
#[cfg(loom)]
cell.with_mut(|ptr| ptr.write(value));

#[cfg(not(loom))]
cell.get().write(value);
}

// TODO(loom): loom may add get() to &mut AtomicUsize in the future
pub unsafe fn read<T>(cell: &UnsafeCell<T>) -> T {
#[cfg(loom)]
let v = cell.with(|ptr| ptr.read());

#[cfg(not(loom))]
let v = cell.get().read();

v
}
}

pub mod thread {
#[cfg(loom)]
pub use loom::thread::*;
#[cfg(not(loom))]
pub use std::thread::*;
}
}

mod bounded;
mod single;
mod unbounded;
Expand Down
15 changes: 8 additions & 7 deletions src/single.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::cell::UnsafeCell;
use crate::facade::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use crate::facade::sync::atomic::{AtomicUsize, Ordering};
use crate::facade::{thread, cell};

use crate::{PopError, PushError};
use crate::facade::sync::atomic;

const LOCKED: usize = 1 << 0;
const PUSHED: usize = 1 << 1;
Expand Down Expand Up @@ -33,7 +34,7 @@ impl<T> Single<T> {

if state == 0 {
// Write the value and unlock.
unsafe { self.slot.get().write(MaybeUninit::new(value)) }
unsafe { cell::write(&self.slot, MaybeUninit::new(value)) }
self.state.fetch_and(!LOCKED, Ordering::Release);
Ok(())
} else if state & CLOSED != 0 {
Expand All @@ -54,7 +55,7 @@ impl<T> Single<T> {

if prev == state {
// Read the value and unlock.
let value = unsafe { self.slot.get().read().assume_init() };
let value = unsafe { cell::read(&self.slot).assume_init() };
self.state.fetch_and(!LOCKED, Ordering::Release);
return Ok(value);
}
Expand Down Expand Up @@ -112,8 +113,8 @@ impl<T> Single<T> {
impl<T> Drop for Single<T> {
fn drop(&mut self) {
// Drop the value in the slot.
if *self.state.get_mut() & PUSHED != 0 {
let value = unsafe { self.slot.get().read().assume_init() };
if atomic::load_unique(&mut self.state) & PUSHED != 0 {
let value = unsafe { cell::read(&self.slot).assume_init() };
drop(value);
}
}
Expand Down
38 changes: 31 additions & 7 deletions src/unbounded.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::cell::UnsafeCell;
use crate::facade::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use std::thread;
use crate::facade::sync::atomic::{AtomicPtr, AtomicUsize, Ordering};
use crate::facade::{thread, cell};

use cache_padded::CachePadded;

Expand Down Expand Up @@ -37,6 +37,7 @@ struct Slot<T> {
}

impl<T> Slot<T> {
#[cfg(not(loom))]
const UNINIT: Slot<T> = Slot {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
Expand Down Expand Up @@ -64,9 +65,32 @@ struct Block<T> {
impl<T> Block<T> {
/// Creates an empty block.
fn new() -> Block<T> {
#[cfg(not(loom))]
let slots = [Slot::UNINIT; BLOCK_CAP];

#[cfg(loom)]
let slots = {
use std::convert::TryFrom;

let mut vec = Vec::with_capacity(BLOCK_CAP);
for _ in 0..BLOCK_CAP {
vec.push(Slot {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
});
}

let res = Box::<[Slot<T>; BLOCK_CAP]>::try_from(vec.into_boxed_slice());

match res {
Ok(boxed_array) => *boxed_array,
Err(_) => unreachable!(),
}
};

Block {
next: AtomicPtr::new(ptr::null_mut()),
slots: [Slot::UNINIT; BLOCK_CAP],
slots,
}
}

Expand Down Expand Up @@ -205,7 +229,7 @@ impl<T> Unbounded<T> {

// Write the value into the slot.
let slot = (*block).slots.get_unchecked(offset);
slot.value.get().write(MaybeUninit::new(value));
cell::write(&slot.value, MaybeUninit::new(value));
slot.state.fetch_or(WRITE, Ordering::Release);
return Ok(());
},
Expand Down Expand Up @@ -287,7 +311,7 @@ impl<T> Unbounded<T> {
// Read the value.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let value = slot.value.get().read().assume_init();
let value = cell::read(&slot.value).assume_init();

// Destroy the block if we've reached the end, or if another thread wanted to
// destroy but couldn't because we were busy reading from the slot.
Expand Down Expand Up @@ -387,7 +411,7 @@ impl<T> Drop for Unbounded<T> {
if offset < BLOCK_CAP {
// Drop the value in the slot.
let slot = (*block).slots.get_unchecked(offset);
let value = slot.value.get().read().assume_init();
let value = cell::read(&slot.value).assume_init();
drop(value);
} else {
// Deallocate the block and move to the next one.
Expand Down