From 69c42d5018b279fa747f87f01057044782b9781c Mon Sep 17 00:00:00 2001 From: Bruno Dutra Date: Tue, 12 Apr 2022 23:27:11 +0200 Subject: [PATCH] replace AtomicQueue by crossbeam::ArrayQueue this is possible now that crossbeam-rs/crossbeam#789 has been released with v0.3.5. --- Cargo.toml | 3 +- src/buffer.rs | 204 ++------------------------------------------------ 2 files changed, 7 insertions(+), 200 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 22c7e94..0a0f2b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,13 +16,14 @@ all-features = true [features] default = ["std", "futures_api"] -std = ["crossbeam-utils/std", "futures/std", "slotmap/std", "spin/std"] +std = ["crossbeam-queue/std", "crossbeam-utils/std", "futures/std", "slotmap/std", "spin/std"] futures_api = ["futures", "slotmap", "spin"] [badges] codecov = { repository = "brunocodutra/ring-channel" } [dependencies] +crossbeam-queue = { version = "0.3.4", default-features = false, features = ["alloc"] } crossbeam-utils = { version = "0.8.7", default-features = false } derivative = { version = "2.2.0", default-features = false, features = ["use_core"] } futures = { version = "0.3.21", default-features = false, features = ["executor"], optional = true } diff --git a/src/buffer.rs b/src/buffer.rs index 37121d8..d08255e 100644 --- a/src/buffer.rs +++ b/src/buffer.rs @@ -1,207 +1,13 @@ use crate::atomic::AtomicOption; -use alloc::boxed::Box; -use core::sync::atomic::{self, AtomicUsize, Ordering}; -use core::{cell::UnsafeCell, mem::MaybeUninit}; -use crossbeam_utils::{Backoff, CachePadded}; +use crossbeam_queue::ArrayQueue; use derivative::Derivative; -#[derive(Derivative)] -#[derivative(Debug(bound = ""))] -struct Slot { - // If the stamp equals the tail, this node will be next written to. - // If it equals head + 1, this node will be next read from. - stamp: AtomicUsize, - value: UnsafeCell>, -} - -impl Slot { - fn new(stamp: usize) -> Self { - Slot { - stamp: AtomicUsize::new(stamp), - value: UnsafeCell::new(MaybeUninit::uninit()), - } - } -} - -#[derive(Derivative)] -#[derivative(Debug(bound = ""))] -pub struct AtomicQueue { - head: CachePadded, - tail: CachePadded, - buffer: Box<[CachePadded>]>, - lap: usize, -} - -unsafe impl Sync for AtomicQueue {} -unsafe impl Send for AtomicQueue {} - -impl AtomicQueue { - fn new(capacity: usize) -> AtomicQueue { - AtomicQueue { - buffer: (0..capacity).map(Slot::new).map(CachePadded::new).collect(), - head: Default::default(), - tail: Default::default(), - lap: (capacity + 1).next_power_of_two(), - } - } - - #[inline] - fn capacity(&self) -> usize { - self.buffer.len() - } - - #[inline] - fn get(&self, cursor: usize) -> &Slot { - let index = cursor & (self.lap - 1); - debug_assert!(index < self.capacity()); - unsafe { self.buffer.get_unchecked(index) } - } - - fn advance(&self, cursor: usize) -> usize { - let index = cursor & (self.lap - 1); - let stamp = cursor & !(self.lap - 1); - - if index + 1 < self.capacity() { - // Same lap, incremented index. - // Set to `{ stamp: stamp, index: index + 1 }`. - cursor + 1 - } else { - // One lap forward, index wraps around to zero. - // Set to `{ stamp: stamp.wrapping_add(1), index: 0 }`. - stamp.wrapping_add(self.lap) - } - } - - fn push_or_swap(&self, value: T) -> Option { - let backoff = Backoff::new(); - let mut tail = self.tail.load(Ordering::Relaxed); - - loop { - let new_tail = self.advance(tail); - let slot = self.get(tail); - let stamp = slot.stamp.load(Ordering::Acquire); - - // If the stamp matches the tail, we may attempt to push. - if stamp == tail { - // Try advancing the tail. - match self.tail.compare_exchange_weak( - tail, - new_tail, - Ordering::SeqCst, - Ordering::Relaxed, - ) { - Ok(_) => { - // Write the value into the slot. - unsafe { slot.value.get().write(MaybeUninit::new(value)) }; - slot.stamp.store(tail + 1, Ordering::Release); - return None; - } - - Err(t) => { - tail = t; - backoff.spin(); - continue; - } - } - // If the stamp lags one lap behind the tail, we may attempt to swap. - } else if stamp.wrapping_add(self.lap) == tail + 1 { - atomic::fence(Ordering::SeqCst); - - // Try advancing the head, if it lags one lap behind the tail as well. - if self - .head - .compare_exchange_weak( - tail.wrapping_sub(self.lap), - new_tail.wrapping_sub(self.lap), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_ok() - { - // Advance the tail. - debug_assert_eq!(self.tail.load(Ordering::SeqCst), tail); - self.tail.store(new_tail, Ordering::SeqCst); - - // Replace the value in the slot. - let new = MaybeUninit::new(value); - let old = unsafe { slot.value.get().replace(new).assume_init() }; - slot.stamp.store(tail + 1, Ordering::Release); - return Some(old); - } - } - - backoff.snooze(); - tail = self.tail.load(Ordering::Relaxed); - } - } - - fn pop(&self) -> Option { - let backoff = Backoff::new(); - let mut head = self.head.load(Ordering::Relaxed); - - loop { - let slot = self.get(head); - let stamp = slot.stamp.load(Ordering::Acquire); - - // If the the stamp is ahead of the head by 1, we may attempt to pop. - if stamp == head + 1 { - // Try advancing the head. - match self.head.compare_exchange_weak( - head, - self.advance(head), - Ordering::SeqCst, - Ordering::Relaxed, - ) { - Ok(_) => { - // Read the value from the slot. - let msg = unsafe { slot.value.get().read().assume_init() }; - slot.stamp - .store(head.wrapping_add(self.lap), Ordering::Release); - return Some(msg); - } - - Err(h) => { - head = h; - backoff.spin(); - continue; - } - } - // If the stamp matches the head, the queue may be empty. - } else if stamp == head { - atomic::fence(Ordering::SeqCst); - - // If the tail matches the head as well, the queue is empty. - if self.tail.load(Ordering::Relaxed) == head { - return None; - } - } - - backoff.snooze(); - head = self.head.load(Ordering::Relaxed); - } - } -} - -impl Drop for AtomicQueue { - fn drop(&mut self) { - let mut cursor = self.head.load(Ordering::Relaxed); - let end = self.tail.load(Ordering::Relaxed); - - // Loop over all slots that hold a message and drop them. - while cursor != end { - let slot = self.get(cursor); - unsafe { (&mut *slot.value.get()).as_mut_ptr().drop_in_place() }; - cursor = self.advance(cursor); - } - } -} - #[derive(Derivative)] #[derivative(Debug(bound = ""))] #[allow(clippy::large_enum_variant)] pub(super) enum RingBuffer { Atomic(AtomicOption), - Queue(AtomicQueue), + Queue(ArrayQueue), } impl RingBuffer { @@ -211,7 +17,7 @@ impl RingBuffer { if capacity == 1 { RingBuffer::Atomic(Default::default()) } else { - RingBuffer::Queue(AtomicQueue::new(capacity)) + RingBuffer::Queue(ArrayQueue::new(capacity)) } } @@ -228,7 +34,7 @@ impl RingBuffer { pub(super) fn push(&self, value: T) -> Option { match self { RingBuffer::Atomic(c) => c.swap(value), - RingBuffer::Queue(q) => q.push_or_swap(value), + RingBuffer::Queue(q) => q.force_push(value), } } @@ -267,7 +73,7 @@ mod tests { assert_eq!( discriminant(&RingBuffer::::new(2)), - discriminant(&RingBuffer::Queue(AtomicQueue::new(2))) + discriminant(&RingBuffer::Queue(ArrayQueue::new(2))) ); }