Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
b-naber committed Mar 12, 2021
1 parent c34c8d8 commit 8d131c8
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 128 deletions.
2 changes: 1 addition & 1 deletion tokio/src/sync/mod.rs
Expand Up @@ -459,7 +459,7 @@ cfg_sync! {
pub(crate) use task::AtomicWaker;

mod once_cell;
pub use self::once_cell::{OnceCell, NotInitializedError, AlreadyInitializedError};
pub use self::once_cell::{OnceCell, NotInitializedError, SetError};

pub mod watch;
}
Expand Down
191 changes: 118 additions & 73 deletions tokio/src/sync/once_cell.rs
Expand Up @@ -4,17 +4,16 @@ use std::error::Error;
use std::fmt;
use std::future::Future;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};

/// A thread-safe cell which can be written to only once.
///
/// Provides the functionality to either set the value, in case `OnceCell`
/// is uninitialized, or get the already initialized value by using an async
/// function via [`OnceCell::get_or_init_with`] or by using a Future via
/// [`OnceCell::get_or_init`] directly via [`OnceCell::get_or_init`].
/// function via [`OnceCell::get_or_init_with`].
///
/// [`OnceCell::get_or_init_with`]: crate::sync::OnceCell::get_or_init_with
/// [`OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init
///
/// # Examples
/// ```
Expand All @@ -24,7 +23,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
/// 1 + 1
/// }
///
/// static ONCE: OnceCell<u32> = OnceCell::new();
/// static ONCE: OnceCell<u32> = OnceCell::const_new();
///
/// #[tokio::main]
/// async fn main() {
Expand Down Expand Up @@ -76,10 +75,19 @@ impl<T: PartialEq> PartialEq for OnceCell<T> {
impl<T: Eq> Eq for OnceCell<T> {}

impl<T> OnceCell<T> {
/// Creates a new uninitialized OnceCell instance.
pub fn new() -> Self {
OnceCell {
value_set: AtomicBool::new(false),
value: UnsafeCell::new(MaybeUninit::uninit()),
semaphore: Semaphore::new(1),
}
}

/// Creates a new uninitialized OnceCell instance.
#[cfg(all(feature = "parking_lot", not(all(loom, test)),))]
#[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))]
pub const fn new() -> Self {
pub const fn const_new() -> Self {
OnceCell {
value_set: AtomicBool::new(false),
value: UnsafeCell::new(MaybeUninit::uninit()),
Expand All @@ -97,6 +105,12 @@ impl<T> OnceCell<T> {
&*self.value.with(|ptr| (*ptr).as_ptr())
}

// SAFETY: safe to call only once self.initialized() is true and since
// only one mutable reference can call this method
unsafe fn get_unchecked_mut(&mut self) -> &mut T {
&mut *self.value.with_mut(|ptr| (*ptr).as_mut_ptr())
}

// SAFETY: safe to call only once a permit on the semaphore has been
// acquired
unsafe fn set_value(&self, value: T) {
Expand All @@ -115,27 +129,38 @@ impl<T> OnceCell<T> {
if self.initialized() {
Ok(unsafe { self.get_unchecked() })
} else {
Err(NotInitializedError)
Err(NotInitializedError(()))
}
}

/// If the cell is initialized, this method returns a mutable reference to its value,
/// otherwise returns [`NotInitializedError`].
///
/// [`NotInitializedError`]: crate::sync::NotInitializederror
pub fn get_mut(&mut self) -> Result<&mut T, NotInitializedError> {
if self.initialized() {
Ok(unsafe { self.get_unchecked_mut() })
} else {
Err(NotInitializedError(()))
}
}

/// Sets the value of the OnceCell to the argument value.
///
/// If the value of the OnceCell was already set prior to this call
/// or some other set is currently initializing the cell, then
/// [`AlreadyInitializedError`] is returned. In order to wait
/// for an ongoing initialization to finish, call [`OnceCell::get_or_init`]
/// or [`OnceCell::get_or_init_with`] instead.
/// then [`AlreadyInitializedError`] is returned. If another thread
/// is initializing the cell while this method is called,
/// ['InitializingError`] is returned. In order to wait
/// for an ongoing initialization to finish, call
/// [`OnceCell::get_or_init_with`] instead.
///
/// [`AlreadyInitializedError`]: crate::sync::AlreadyInitializedError
/// ['OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init
/// [`InitializingError`]: crate::sync::InitializingError
/// ['OnceCell::get_or_init_with`]: crate::sync::OnceCell::get_or_init_with
pub fn set(&self, value: T) -> Result<(), AlreadyInitializedError> {
pub fn set(&self, value: T) -> Result<(), SetError> {
if !self.initialized() {
// After acquire().await we have either acquired a permit while self.value
// is still uninitialized, or another thread has intialized the value and
// closed the semaphore, in which case self.initialized is true and we
// don't set the value
// Another thread might be initializing the cell, in which case `try_acquire` will
// return an error
match self.semaphore.try_acquire() {
Ok(_permit) => {
if !self.initialized() {
Expand All @@ -151,16 +176,30 @@ impl<T> OnceCell<T> {
}
}
_ => {
// Couldn't acquire the permit, look if initializing process is already completed
if !self.initialized() {
panic!(
"couldn't acquire a permit even though OnceCell value is uninitialized."
);
return Err(SetError::InitializingError(()));
}
}
}
}

Err(AlreadyInitializedError)
Err(SetError::AlreadyInitializedError(()))
}

/// Tries to set the value of the cell, overwriting the previously set value, in case one is
/// available. If no value was previously set, this method has the same functionality has
/// [`OnceCell::set`].
///
/// [`OnceCell::set`]: crate::sync::OnceCell::set
pub fn set_mut(&mut self, value: T) -> Result<(), SetError> {
if self.initialized() {
// SAFETY: Setting this value is safe because the mutable reference guarantees exclusivity
unsafe { self.set_value(value) };
Ok(())
} else {
self.set(value)
}
}

/// Tries to initialize the value of the OnceCell using the async function `f`.
Expand Down Expand Up @@ -217,56 +256,25 @@ impl<T> OnceCell<T> {
}
}

/// Tries to initialize the value of the `OnceCell` using the the Future `f`.
/// If the value of the `OnceCell` was already initialized prior to this call,
/// a reference to that initialized value is returned. If some other thread
/// initiated the initialization prior to this call and the initialization
/// hasn't completed, this call waits until the initialization is finished.
///
/// This will deadlock if `f` internally tries to initialize the cell itself.
pub async fn get_or_init<F>(&self, f: F) -> &T
where
F: Future<Output = T>,
{
/// Moves the value out of the cell and drops the cell afterwards.
pub fn into_inner(self) -> Result<T, NotInitializedError> {
if self.initialized() {
// SAFETY: once the value is initialized, no mutable references are given out, so
// we can give out arbitrarily many immutable references
return unsafe { self.get_unchecked() };
Ok(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) })
} else {
// After acquire().await we have either acquired a permit while self.value
// is still uninitialized, or current thread is awoken after another thread
// has intialized the value and closed the semaphore, in which case self.initialized
// is true and we don't set the value here
match self.semaphore.acquire().await {
Ok(_permit) => {
if !self.initialized() {
// If `f` panics or `select!` is called, this `get_or_init` call
// is aborted and the semaphore permit is dropped.
let value = f.await;

// SAFETY: There is only one permit on the semaphore, hence only one
// mutable reference is created
unsafe { self.set_value(value) };
Err(NotInitializedError(()))
}
}

// SAFETY: once the value is initialized, no mutable references are given out, so
// we can give out arbitrarily many immutable references
return unsafe { self.get_unchecked() };
} else {
unreachable!("acquired semaphore after value was already initialized.");
}
}
Err(_) => {
if self.initialized() {
// SAFETY: once the value is initialized, no mutable references are given out, so
// we can give out arbitrarily many immutable references
return unsafe { self.get_unchecked() };
} else {
unreachable!(
"Semaphore closed, but the OnceCell has not been initialized."
);
}
}
}
/// Takes ownership of the current value, leaving the cell unitialized.
pub fn take(&mut self) -> Result<T, NotInitializedError> {
if self.initialized() {
// Note: ptr::read does not move the value out of `self.value`. We need to use
// `self.initialized` to prevent incorrect accesses to value
let value = unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) };
self.value_set.store(false, Ordering::Release);
Ok(value)
} else {
Err(NotInitializedError(()))
}
}
}
Expand All @@ -284,25 +292,62 @@ unsafe impl<T: Sync + Send> Sync for OnceCell<T> {}
// it's safe to send it to another thread
unsafe impl<T: Send> Send for OnceCell<T> {}

/// Error returned from the [`OnceCell::set`] method
/// Errors that can be returned from [`OnceCell::set`] and
/// [`OnceCell::set_mut`].
///
/// [`OnceCell::set`]: crate::sync::OnceCell::set
/// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut
#[derive(Debug, PartialEq)]
pub struct AlreadyInitializedError;
pub enum SetError {
/// Error resulting from [`OnceCell::set`] or [`OnceCell::set_mut`] calls if
/// the cell was previously initialized.
///
/// [`OnceCell::set`]: crate::sync::OnceCell::set
/// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut
AlreadyInitializedError(()),

impl fmt::Display for AlreadyInitializedError {
/// Error resulting from [`OnceCell::set`] or [`OnceCell::set_mut`] calls when
/// the cell is currently being inintialized during the calls to those methods.
///
/// [`OnceCell::set`]: crate::sync::OnceCell::set
/// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut
InitializingError(()),
}

impl fmt::Display for SetError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "AlreadyInitializedError")
match self {
SetError::AlreadyInitializedError(_) => write!(f, "AlreadyInitializedError"),
SetError::InitializingError(_) => write!(f, "InitializingError"),
}
}
}

impl Error for AlreadyInitializedError {}
impl Error for SetError {}

impl SetError {
/// Whether `SetError` is `SetError::AlreadyInitializEderror`
pub fn is_already_init_err(&self) -> bool {
match self {
SetError::AlreadyInitializedError(_) => true,
SetError::InitializingError(_) => false,
}
}

/// Whether `SetError` is `SetError::InitializingError`
pub fn is_initializing_err(&self) -> bool {
match self {
SetError::AlreadyInitializedError(_) => false,
SetError::InitializingError(_) => true,
}
}
}

/// Error returned from the [`OnceCell::get`] method
///
/// [`OnceCell::get`]: crate::sync::OnceCell::get
#[derive(Debug, PartialEq)]
pub struct NotInitializedError;
pub struct NotInitializedError(());

impl fmt::Display for NotInitializedError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
30 changes: 7 additions & 23 deletions tokio/tests/async_send_sync.rs
Expand Up @@ -2,8 +2,10 @@
#![cfg(feature = "full")]

use std::cell::Cell;
use std::future::Future;
use std::io::{Cursor, SeekFrom};
use std::net::SocketAddr;
use std::pin::Pin;
use std::rc::Rc;
use tokio::net::TcpStream;
use tokio::time::{Duration, Instant};
Expand Down Expand Up @@ -270,37 +272,19 @@ async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init_with(
async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = u8> + Send>>): Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = u8>>>): Send & !Sync);
_, fn() -> Pin<Box<dyn Future<Output = u8>>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send + Sync>>): Send & Sync);
_, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send + Sync>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send>>): Send & !Sync);
_, fn() -> Pin<Box<dyn Future<Output = Cell<u8>> + Send>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Cell<u8>>>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send + Sync>>): Send & Sync);
_, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send + Sync>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send>>): Send & !Sync);
_, fn() -> Pin<Box<dyn Future<Output = Rc<u8>> + Send>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init_with(
_, fn() -> Pin<Box<dyn Future<Output = Rc<u8>>>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
_, Pin<Box<dyn Future<Output = u8> + Send + Sync>>): Send & Sync);
async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
_, Pin<Box<dyn Future<Output = u8> + Send>>): Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<u8>::get_or_init(
_, Pin<Box<dyn Future<Output = u8>>>): Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Cell<u8>> + Send + Sync>>): Send & Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Cell<u8>> + Send>>): Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Cell<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Cell<u8>>>>): !Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Rc<u8>> + Send + Sync>>): Send & Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Rc<u8>> + Send>>): Send & !Sync);
async_assert_fn!(tokio::sync::OnceCell<Rc<u8>>::get_or_init(
_, Pin<Box<dyn Future<Output = Rc<u8>>>>): !Send & !Sync);
assert_value!(tokio::sync::OnceCell<u8>: Send & Sync);
assert_value!(tokio::sync::OnceCell<Cell<u8>>: Send & !Sync);
assert_value!(tokio::sync::OnceCell<Rc<u8>>: !Send & !Sync);
Expand Down

0 comments on commit 8d131c8

Please sign in to comment.