Skip to content

Commit

Permalink
Add support for the critical-section crate
Browse files Browse the repository at this point in the history
This allows running in a no_std environment.
  • Loading branch information
danieldg committed Feb 22, 2023
1 parent 9d748b9 commit 1eda4f5
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 55 deletions.
7 changes: 6 additions & 1 deletion Cargo.toml
Expand Up @@ -14,7 +14,12 @@ keywords = ["lazy", "static", "async"]
categories = ["rust-patterns", "memory-management"]

[features]
unpin = []
unpin = ['std']
critical-section = ['dep:critical-section']
std = []

[dependencies]
critical-section = { version = "1", optional = true }

[package.metadata.docs.rs]
all-features = true
185 changes: 131 additions & 54 deletions src/lib.rs
@@ -1,9 +1,10 @@
//! A collection of lazy initialized values that are created by `Future`s.
//!
//! [OnceCell]'s API should be familiar to anyone who has used the
//! [`once_cell`](https://crates.io/crates/once_cell) crate or the proposed `std::lazy` module. It
//! provides an async version of a cell that can only be initialized once, permitting tasks to wait
//! on the initialization if it is already running instead of racing multiple initialization tasks.
//! [`once_cell`](https://crates.io/crates/once_cell) crate or the proposed `std::cell::OnceCell`.
//! It provides an async version of a cell that can only be initialized once, permitting tasks to
//! wait on the initialization if it is already running instead of racing multiple initialization
//! tasks.
//!
//! Unlike threads, tasks can be cancelled at any point where they block. [OnceCell] deals with
//! this by allowing another initializer to run if the task currently initializing the cell is
Expand All @@ -19,19 +20,93 @@
//!
//! Both cells use two `usize`s to store state and do not retain any allocations after
//! initialization is complete. [OnceCell] and [Lazy] only allocate if there is contention.
//!
//! # Features
//!
//! ## The `critical-section` feature
//!
//! If this feature is enabled, the [`critical-section`](https://crates.io/crates/critical-section)
//! crate is used instead of an `std` mutex. You must depend on that crate and select a locking
//! implementation; see [its documentation](https://docs.rs/critical-section/) for details.
//!
//! ## The `unpin` feature
//!
//! This feature enables the `unpin` module which contains an alternative API for [Lazy] that does
//! not rely on pinning the object during initialization, even for futures that are not [Unpin].
//! In general, prefer the types in the crate root and, if needed, box futures to make them unpin.
//!
//! ## The `std` feature
//!
//! This is currently a no-op, but might in the future be used to expose APIs that depends on
//! types only in `std`. It does *not* control the locking implementation.

#![cfg_attr(not(feature = "critical-section"), no_std)]
extern crate alloc;

#[cfg(any(not(feature = "critical-section"), feature = "std"))]
extern crate std;

use std::{
use alloc::{boxed::Box, vec, vec::Vec};

use core::{
cell::UnsafeCell,
convert::Infallible,
future::Future,
panic::{RefUnwindSafe, UnwindSafe},
pin::Pin,
ptr,
sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
sync::Mutex,
task,
};

#[cfg(feature = "critical-section")]
struct Mutex<T> {
data: UnsafeCell<T>,
locked: core::sync::atomic::AtomicBool,
}

#[cfg(feature = "critical-section")]
impl<T> Mutex<T> {
const fn new(data: T) -> Self {
Mutex { data: UnsafeCell::new(data), locked: core::sync::atomic::AtomicBool::new(false) }
}
}

#[cfg(not(feature = "critical-section"))]
use std::sync::Mutex;

#[cfg(feature = "critical-section")]
fn with_lock<T, R>(mutex: &Mutex<T>, f: impl FnOnce(&mut T) -> R) -> R {
struct Guard<'a, T>(&'a Mutex<T>);
impl<'a, T> Drop for Guard<'a, T> {
fn drop(&mut self) {
self.0.locked.store(false, Ordering::Relaxed);
}
}
critical_section::with(|_| {
if mutex.locked.swap(true, Ordering::Relaxed) {
// Note: this can in theory happen if the delegated Clone impl on a Waker provided in
// an initialization context turns around and tries to initialize the same cell. This
// is an absurd thing to do, but it's safe so we can't assume nobody will ever do it.
panic!("Attempted reentrant locking");
}
let guard = Guard(mutex);
// Safety: we just checked that we were the one to set `locked` to true, and the data in
// this Mutex will only be accessed while the lock is true. We use Relaxed memory ordering
// instead of Acquire/Release because critical_section::with itself must provide an
// Acquire/Release barrier around its closure, and also guarantees that there will not be
// more than one such closure executing at a time.
let rv = unsafe { f(&mut *mutex.data.get()) };
drop(guard);
rv
})
}

#[cfg(not(feature = "critical-section"))]
fn with_lock<T, R>(mutex: &Mutex<T>, f: impl FnOnce(&mut T) -> R) -> R {
f(&mut *mutex.lock().unwrap())
}

/// Types that do not rely on pinning during initialization.
///
/// This module is only built if the `unpin` crate feature is enabled.
Expand Down Expand Up @@ -256,7 +331,7 @@ impl<'a> Drop for QuickInitGuard<'a> {
if !queue.is_null() {
// Synchronize with both the fetch_sub that lowered the refcount and the
// queue initialization.
std::sync::atomic::fence(Ordering::Acquire);
core::sync::atomic::fence(Ordering::Acquire);
// Safety: we observed no active QueueRefs, and queue is only used by
// guard-holders. Due to the swap, we are the only one who is freeing this
// particular queue.
Expand Down Expand Up @@ -289,15 +364,15 @@ impl<'a> Drop for QuickInitGuard<'a> {
// Safety: the guard holds a place on the waiter list and we just checked that the
// queue is non-null. It will remain valid until guard is dropped.
let queue = unsafe { &*guard.queue };
let mut lock = queue.wakers.lock().unwrap();

// Ensure that nobody else can grab the QueueHead between when we release QINIT_BIT and
// when our QueueHead is dropped.
lock.get_or_insert_with(Vec::new);
// Allow someone else to take the head position once we drop it. Ordering is handled
// by the Mutex.
self.0.state.fetch_and(!QINIT_BIT, Ordering::Relaxed);
drop(lock);
with_lock(&queue.wakers, |lock| {
// Ensure that nobody else can grab the QueueHead between when we release QINIT_BIT
// and when our QueueHead is dropped.
lock.get_or_insert_with(Vec::new);
// Allow someone else to take the head position once we drop it. Ordering is
// handled by the Mutex.
self.0.state.fetch_and(!QINIT_BIT, Ordering::Relaxed);
});

// Safety: we just took the head position, and we were the QuickInitGuard
drop(QueueHead { guard })
Expand Down Expand Up @@ -334,46 +409,52 @@ impl<'a> Future for QueueWaiter<'a> {
// Safety: the guard holds a place on the waiter list and we just checked that the state is
// not ready, so the queue is non-null and will remain valid until guard is dropped.
let queue = unsafe { &*guard.queue };
let mut lock = queue.wakers.lock().unwrap();
let rv = with_lock(&queue.wakers, |lock| {
// Another task might have called set_ready() and dropped its QueueHead between our
// optimistic lock-free check and our lock acquisition. Don't return a QueueHead unless we
// know for sure that we are allowed to initialize.
let state = guard.inner.state.load(Ordering::Acquire);
if state & READY_BIT != 0 {
return task::Poll::Ready(None);
}

// Another task might have called set_ready() and dropped its QueueHead between our
// optimistic lock-free check and our lock acquisition. Don't return a QueueHead unless we
// know for sure that we are allowed to initialize.
let state = guard.inner.state.load(Ordering::Acquire);
if state & READY_BIT != 0 {
return task::Poll::Ready(None);
}
match lock.as_mut() {
None if state & QINIT_BIT == 0 => {
// take the head position and start a waker queue
*lock = Some(Vec::new());

match lock.as_mut() {
None if state & QINIT_BIT == 0 => {
// take the head position and start a waker queue
*lock = Some(Vec::new());
drop(lock);

// Safety: we know that nobody else has a QuickInitGuard because we are holding a
// QueueRef that prevents state from being 0 (which is required to create a
// new QuickInitGuard), and we just checked that one wasn't created before we
// created our QueueRef.
task::Poll::Ready(Some(QueueHead { guard: self.guard.take().unwrap() }))
}
None => {
// Someone else has a QuickInitGuard; they will wake us when they finish.
let waker = cx.waker().clone();
*lock = Some(vec![waker]);
task::Poll::Pending
}
Some(wakers) => {
// Wait for the QueueHead to be dropped
let my_waker = cx.waker();
for waker in wakers.iter() {
if waker.will_wake(my_waker) {
return task::Poll::Pending;
task::Poll::Ready(Some(()))
}
None => {
// Someone else has a QuickInitGuard; they will wake us when they finish.
let waker = cx.waker().clone();
*lock = Some(vec![waker]);
task::Poll::Pending
}
Some(wakers) => {
// Wait for the QueueHead to be dropped
let my_waker = cx.waker();
for waker in wakers.iter() {
if waker.will_wake(my_waker) {
return task::Poll::Pending;
}
}
wakers.push(my_waker.clone());
task::Poll::Pending
}
wakers.push(my_waker.clone());
task::Poll::Pending
}
}
});

// Safety: If rv is Ready/Some, we know:
// - we are holding a QueueRef (in guard) that prevents state from being 0
// - creating a new QuickInitGuard requires the state to be 0
// - we just checked QINIT_BIT and saw there isn't a QuickInitGuard active
// - the queue was None, meaning there are no current QueueHeads
// - we just set the queue to Some, claiming the head
//
// If rv is Ready/None, this is due to READY_BIT being set.
// If rv is Pending, we have a waker in the queue.
rv.map(|o| o.map(|()| QueueHead { guard: self.guard.take().unwrap() }))
}
}

Expand All @@ -382,11 +463,7 @@ impl<'a> Drop for QueueHead<'a> {
// Safety: if queue is not null, then it is valid as long as the guard is alive
if let Some(queue) = unsafe { self.guard.queue.as_ref() } {
// Take the waker queue so the next QueueWaiter can make a new one
let wakers = queue
.wakers
.lock()
.expect("Lock poisoned")
.take()
let wakers = with_lock(&queue.wakers, |w| w.take())
.expect("QueueHead dropped without a waker list");
for waker in wakers {
waker.wake();
Expand Down
1 change: 1 addition & 0 deletions src/unpin.rs
@@ -1,3 +1,4 @@
use alloc::{boxed::Box, vec::Vec};
use std::{
cell::UnsafeCell,
convert::Infallible,
Expand Down

0 comments on commit 1eda4f5

Please sign in to comment.