Skip to content

Commit

Permalink
Remove dependency on unsound atomic-option crate
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Jun 17, 2022
1 parent 2a77cdb commit 365c3f4
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 5 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Expand Up @@ -18,7 +18,6 @@ codecov = { repository = "jonhoo/bus", branch = "master", service = "github" }
maintenance = { status = "passively-maintained" }

[dependencies]
atomic-option = "0.1"
num_cpus = "1.6.2"
parking_lot_core = "0.9"
crossbeam-channel = "0.5"
Expand Down
54 changes: 50 additions & 4 deletions src/lib.rs
Expand Up @@ -100,12 +100,13 @@
#![deny(missing_docs)]
#![warn(rust_2018_idioms)]

use atomic_option::AtomicOption;
use crossbeam_channel as mpsc;
use parking_lot_core::SpinWait;

use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::Deref;
use std::ptr;
use std::sync::atomic;
use std::sync::mpsc as std_mpsc;
use std::sync::Arc;
Expand Down Expand Up @@ -190,7 +191,7 @@ impl<T: Clone + Sync> Seat<T> {
// we're the last reader, so we may need to notify the writer there's space in the buf.
// can be relaxed, since the acquire at the top already guarantees that we'll see
// updates.
waiting = self.waiting.take(atomic::Ordering::Relaxed);
waiting = self.waiting.take();

// since we're the last reader, no-one else will be cloning this value, so we can
// safely take a mutable reference, and just take the val instead of cloning it.
Expand Down Expand Up @@ -381,7 +382,7 @@ impl<T> Bus<T> {
// no, so block by parking and telling readers to notify on last read
self.state.ring[fence]
.waiting
.replace(Some(Box::new(thread::current())), atomic::Ordering::Relaxed);
.swap(Some(Box::new(thread::current())));

// need the atomic fetch_add to ensure reader threads will see the new .waiting
self.state.ring[fence]
Expand Down Expand Up @@ -419,7 +420,7 @@ impl<T> Bus<T> {
let state = unsafe { &mut *next.state.get() };
state.max = readers;
state.val = Some(val);
next.waiting.replace(None, atomic::Ordering::Relaxed);
next.waiting.take();
next.read.store(0, atomic::Ordering::Release);
}
self.rleft[tail] = 0;
Expand Down Expand Up @@ -820,3 +821,48 @@ impl<T: Clone + Sync> Iterator for BusIntoIter<T> {
self.0.recv().ok()
}
}

struct AtomicOption<T> {
ptr: atomic::AtomicPtr<T>,
_marker: PhantomData<Option<Box<T>>>,
}

unsafe impl<T: Send> Send for AtomicOption<T> {}
unsafe impl<T: Send> Sync for AtomicOption<T> {}

impl<T> AtomicOption<T> {
fn empty() -> Self {
Self {
ptr: atomic::AtomicPtr::new(ptr::null_mut()),
_marker: PhantomData,
}
}

fn swap(&self, val: Option<Box<T>>) -> Option<Box<T>> {
let old = match val {
Some(val) => self.ptr.swap(Box::into_raw(val), atomic::Ordering::AcqRel),
// Acquire is needed to synchronize with the store of a non-null ptr, but since a null ptr
// will never be dereferenced, there is no need to synchronize the store of a null ptr.
None => self.ptr.swap(ptr::null_mut(), atomic::Ordering::Acquire),
};
if old.is_null() {
None
} else {
// SAFETY:
// - AcqRel/Acquire ensures that it does not read a pointer to potentially invalid memory.
// - We've checked that old is not null.
// - We do not store invalid pointers other than null in self.ptr.
Some(unsafe { Box::from_raw(old) })
}
}

fn take(&self) -> Option<Box<T>> {
self.swap(None)
}
}

impl<T> Drop for AtomicOption<T> {
fn drop(&mut self) {
drop(self.take());
}
}

0 comments on commit 365c3f4

Please sign in to comment.