Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace AtomicQueue by crossbeam::ArrayQueue #124

Merged
merged 1 commit into from Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -16,13 +16,14 @@ all-features = true

[features]
default = ["std", "futures_api"]
std = ["crossbeam-utils/std", "futures/std", "slotmap/std", "spin/std"]
std = ["crossbeam-queue/std", "crossbeam-utils/std", "futures/std", "slotmap/std", "spin/std"]
futures_api = ["futures", "slotmap", "spin"]

[badges]
codecov = { repository = "brunocodutra/ring-channel" }

[dependencies]
crossbeam-queue = { version = "0.3.4", default-features = false, features = ["alloc"] }
crossbeam-utils = { version = "0.8.7", default-features = false }
derivative = { version = "2.2.0", default-features = false, features = ["use_core"] }
futures = { version = "0.3.21", default-features = false, features = ["executor"], optional = true }
Expand Down
204 changes: 5 additions & 199 deletions src/buffer.rs
@@ -1,207 +1,13 @@
use crate::atomic::AtomicOption;
use alloc::boxed::Box;
use core::sync::atomic::{self, AtomicUsize, Ordering};
use core::{cell::UnsafeCell, mem::MaybeUninit};
use crossbeam_utils::{Backoff, CachePadded};
use crossbeam_queue::ArrayQueue;
use derivative::Derivative;

#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
struct Slot<T> {
// If the stamp equals the tail, this node will be next written to.
// If it equals head + 1, this node will be next read from.
stamp: AtomicUsize,
value: UnsafeCell<MaybeUninit<T>>,
}

impl<T> Slot<T> {
fn new(stamp: usize) -> Self {
Slot {
stamp: AtomicUsize::new(stamp),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
}
}

#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
pub struct AtomicQueue<T> {
head: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicUsize>,
buffer: Box<[CachePadded<Slot<T>>]>,
lap: usize,
}

unsafe impl<T: Send> Sync for AtomicQueue<T> {}
unsafe impl<T: Send> Send for AtomicQueue<T> {}

impl<T> AtomicQueue<T> {
fn new(capacity: usize) -> AtomicQueue<T> {
AtomicQueue {
buffer: (0..capacity).map(Slot::new).map(CachePadded::new).collect(),
head: Default::default(),
tail: Default::default(),
lap: (capacity + 1).next_power_of_two(),
}
}

#[inline]
fn capacity(&self) -> usize {
self.buffer.len()
}

#[inline]
fn get(&self, cursor: usize) -> &Slot<T> {
let index = cursor & (self.lap - 1);
debug_assert!(index < self.capacity());
unsafe { self.buffer.get_unchecked(index) }
}

fn advance(&self, cursor: usize) -> usize {
let index = cursor & (self.lap - 1);
let stamp = cursor & !(self.lap - 1);

if index + 1 < self.capacity() {
// Same lap, incremented index.
// Set to `{ stamp: stamp, index: index + 1 }`.
cursor + 1
} else {
// One lap forward, index wraps around to zero.
// Set to `{ stamp: stamp.wrapping_add(1), index: 0 }`.
stamp.wrapping_add(self.lap)
}
}

fn push_or_swap(&self, value: T) -> Option<T> {
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);

loop {
let new_tail = self.advance(tail);
let slot = self.get(tail);
let stamp = slot.stamp.load(Ordering::Acquire);

// If the stamp matches the tail, we may attempt to push.
if stamp == tail {
// Try advancing the tail.
match self.tail.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Write the value into the slot.
unsafe { slot.value.get().write(MaybeUninit::new(value)) };
slot.stamp.store(tail + 1, Ordering::Release);
return None;
}

Err(t) => {
tail = t;
backoff.spin();
continue;
}
}
// If the stamp lags one lap behind the tail, we may attempt to swap.
} else if stamp.wrapping_add(self.lap) == tail + 1 {
atomic::fence(Ordering::SeqCst);

// Try advancing the head, if it lags one lap behind the tail as well.
if self
.head
.compare_exchange_weak(
tail.wrapping_sub(self.lap),
new_tail.wrapping_sub(self.lap),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_ok()
{
// Advance the tail.
debug_assert_eq!(self.tail.load(Ordering::SeqCst), tail);
self.tail.store(new_tail, Ordering::SeqCst);

// Replace the value in the slot.
let new = MaybeUninit::new(value);
let old = unsafe { slot.value.get().replace(new).assume_init() };
slot.stamp.store(tail + 1, Ordering::Release);
return Some(old);
}
}

backoff.snooze();
tail = self.tail.load(Ordering::Relaxed);
}
}

fn pop(&self) -> Option<T> {
let backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);

loop {
let slot = self.get(head);
let stamp = slot.stamp.load(Ordering::Acquire);

// If the the stamp is ahead of the head by 1, we may attempt to pop.
if stamp == head + 1 {
// Try advancing the head.
match self.head.compare_exchange_weak(
head,
self.advance(head),
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
// Read the value from the slot.
let msg = unsafe { slot.value.get().read().assume_init() };
slot.stamp
.store(head.wrapping_add(self.lap), Ordering::Release);
return Some(msg);
}

Err(h) => {
head = h;
backoff.spin();
continue;
}
}
// If the stamp matches the head, the queue may be empty.
} else if stamp == head {
atomic::fence(Ordering::SeqCst);

// If the tail matches the head as well, the queue is empty.
if self.tail.load(Ordering::Relaxed) == head {
return None;
}
}

backoff.snooze();
head = self.head.load(Ordering::Relaxed);
}
}
}

impl<T> Drop for AtomicQueue<T> {
fn drop(&mut self) {
let mut cursor = self.head.load(Ordering::Relaxed);
let end = self.tail.load(Ordering::Relaxed);

// Loop over all slots that hold a message and drop them.
while cursor != end {
let slot = self.get(cursor);
unsafe { (&mut *slot.value.get()).as_mut_ptr().drop_in_place() };
cursor = self.advance(cursor);
}
}
}

#[derive(Derivative)]
#[derivative(Debug(bound = ""))]
#[allow(clippy::large_enum_variant)]
pub(super) enum RingBuffer<T> {
Atomic(AtomicOption<T>),
Queue(AtomicQueue<T>),
Queue(ArrayQueue<T>),
}

impl<T> RingBuffer<T> {
Expand All @@ -211,7 +17,7 @@ impl<T> RingBuffer<T> {
if capacity == 1 {
RingBuffer::Atomic(Default::default())
} else {
RingBuffer::Queue(AtomicQueue::new(capacity))
RingBuffer::Queue(ArrayQueue::new(capacity))
}
}

Expand All @@ -228,7 +34,7 @@ impl<T> RingBuffer<T> {
pub(super) fn push(&self, value: T) -> Option<T> {
match self {
RingBuffer::Atomic(c) => c.swap(value),
RingBuffer::Queue(q) => q.push_or_swap(value),
RingBuffer::Queue(q) => q.force_push(value),
}
}

Expand Down Expand Up @@ -267,7 +73,7 @@ mod tests {

assert_eq!(
discriminant(&RingBuffer::<Void>::new(2)),
discriminant(&RingBuffer::Queue(AtomicQueue::new(2)))
discriminant(&RingBuffer::Queue(ArrayQueue::new(2)))
);
}

Expand Down