From f5174d23caee9a1115f0f582c329880568276fc6 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Thu, 8 Jul 2021 14:13:00 +0200 Subject: [PATCH 1/4] sync: clean up OnceCell implementation --- tokio/src/sync/once_cell.rs | 365 ++++++++++++++++++++---------------- 1 file changed, 208 insertions(+), 157 deletions(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index ce55d9e3546..14f5207c60f 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -1,4 +1,4 @@ -use super::Semaphore; +use super::{Semaphore, SemaphorePermit, TryAcquireError}; use crate::loom::cell::UnsafeCell; use std::error::Error; use std::fmt; @@ -8,15 +8,30 @@ use std::ops::Drop; use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; -/// A thread-safe cell which can be written to only once. +// This file contains an implementation of an OnceCell. The principle +// behind the safety the of the cell is that any thread with an `&OnceCell` may +// access the `value` field according the following rules: +// +// 1. When `value_set` is false, the `value` field may be modified by the +// thread holding the permit on the semaphore. +// 2. When `value_set` is true, the `value` field may be accessed immutably by +// any thread. +// +// It is an invariant that if the semaphore is closed, then `value_set` is true. +// The reverse does not necessarily hold — but if not, the semaphore may not +// have any available permits. +// +// A thread with a `&mut OnceCell` may modify the value in any way it wants as +// long as the invariants are upheld. + +/// A thread-safe cell that can be written to only once. /// -/// Provides the functionality to either set the value, in case `OnceCell` -/// is uninitialized, or get the already initialized value by using an async -/// function via [`OnceCell::get_or_init`]. -/// -/// [`OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init +/// A `OnceCell` is typically used for global variables that need to be +/// initialized once on first use, but need no further changes. The `OnceCell` +/// in Tokio allows the initialization procedure to be asynchronous. /// /// # Examples +/// /// ``` /// use tokio::sync::OnceCell; /// @@ -28,8 +43,28 @@ use std::sync::atomic::{AtomicBool, Ordering}; /// /// #[tokio::main] /// async fn main() { -/// let result1 = ONCE.get_or_init(some_computation).await; -/// assert_eq!(*result1, 2); +/// let result = ONCE.get_or_init(some_computation).await; +/// assert_eq!(*result, 2); +/// } +/// ``` +/// +/// It is often useful to write a wrapper method for accessing the value. +/// +/// ``` +/// use tokio::sync::OnceCell; +/// +/// static ONCE: OnceCell = OnceCell::const_new(); +/// +/// async fn get_global_integer() -> &'static u32 { +/// ONCE.get_or_init(|| async { +/// 1 + 1 +/// }).await +/// } +/// +/// #[tokio::main] +/// async fn main() { +/// let result = get_global_integer(); +/// assert_eq!(*result, 2); /// } /// ``` pub struct OnceCell { @@ -68,7 +103,7 @@ impl Eq for OnceCell {} impl Drop for OnceCell { fn drop(&mut self) { - if self.initialized() { + if self.initialized_mut() { unsafe { self.value .with_mut(|ptr| ptr::drop_in_place((&mut *ptr).as_mut_ptr())); @@ -90,7 +125,7 @@ impl From for OnceCell { } impl OnceCell { - /// Creates a new uninitialized OnceCell instance. + /// Creates a new empty `OnceCell` instance. pub fn new() -> Self { OnceCell { value_set: AtomicBool::new(false), @@ -99,8 +134,9 @@ impl OnceCell { } } - /// Creates a new initialized OnceCell instance if `value` is `Some`, otherwise - /// has the same functionality as [`OnceCell::new`]. + /// Creates a new `OnceCell` that contains the provided value, if any. + /// + /// If the `Option` is `None`, this is equivalent to `OnceCell::new`. /// /// [`OnceCell::new`]: crate::sync::OnceCell::new pub fn new_with(value: Option) -> Self { @@ -111,8 +147,31 @@ impl OnceCell { } } - /// Creates a new uninitialized OnceCell instance. - #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] + /// Creates a new empty `OnceCell` instance. + /// + /// Equivalent to `OnceCell::new`, except that it can be used in static + /// variables. + /// + /// # Example + /// + /// ``` + /// use tokio::sync::OnceCell; + /// + /// static ONCE: OnceCell = OnceCell::const_new(); + /// + /// async fn get_global_integer() -> &'static u32 { + /// ONCE.get_or_init(|| async { + /// 1 + 1 + /// }).await + /// } + /// + /// #[tokio::main] + /// async fn main() { + /// let result = get_global_integer(); + /// assert_eq!(*result, 2); + /// } + /// ``` + #[cfg(all(feature = "parking_lot", not(all(loom, test))))] #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] pub const fn const_new() -> Self { OnceCell { @@ -122,33 +181,48 @@ impl OnceCell { } } - /// Whether the value of the OnceCell is set or not. + /// Returns `true` if the `OnceCell` currently contains a value, and `false` + /// otherwise. pub fn initialized(&self) -> bool { + // Using acquire ordering so any threads that read a true from this + // atomic is able to read the value. self.value_set.load(Ordering::Acquire) } - // SAFETY: safe to call only once self.initialized() is true + /// Returns `true` if the `OnceCell` currently contains a value, and `false` + /// otherwise. + fn initialized_mut(&mut self) -> bool { + *self.value_set.get_mut() + } + + // SAFETY: The OnceCell must not be empty. unsafe fn get_unchecked(&self) -> &T { &*self.value.with(|ptr| (*ptr).as_ptr()) } - // SAFETY: safe to call only once self.initialized() is true. Safe because - // because of the mutable reference. + // SAFETY: The OnceCell must not be empty. unsafe fn get_unchecked_mut(&mut self) -> &mut T { &mut *self.value.with_mut(|ptr| (*ptr).as_mut_ptr()) } - // SAFETY: safe to call only once a permit on the semaphore has been - // acquired - unsafe fn set_value(&self, value: T) { - self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value)); + fn set_value(&self, value: T, permit: SemaphorePermit<'_>) -> &T { + // SAFETY: We are holding the only permit on the semaphore. + unsafe { + self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value)); + } + + // Using release ordering so any threads that read a true from this + // atomic is able to read the value we just stored. self.value_set.store(true, Ordering::Release); self.semaphore.close(); + permit.forget(); + + // SAFETY: We just initialized the cell. + unsafe { self.get_unchecked() } } - /// Tries to get a reference to the value of the OnceCell. - /// - /// Returns None if the value of the OnceCell hasn't previously been initialized. + /// Returns a reference to the value currently stored in the `OnceCell`, or + /// `None` if the `OnceCell` is empty. pub fn get(&self) -> Option<&T> { if self.initialized() { Some(unsafe { self.get_unchecked() }) @@ -157,179 +231,161 @@ impl OnceCell { } } - /// Tries to return a mutable reference to the value of the cell. + /// Returns a mutable reference to the value currently stored in the + /// `OnceCell`, or `None` if the `OnceCell` is empty. /// - /// Returns None if the cell hasn't previously been initialized. + /// Since this call borrows the `OnceCell` mutably, it is safe to mutate the + /// value inside the `OnceCell` — the mutable borrow statically guarantees + /// no other references exist. pub fn get_mut(&mut self) -> Option<&mut T> { - if self.initialized() { + if self.initialized_mut() { Some(unsafe { self.get_unchecked_mut() }) } else { None } } - /// Sets the value of the OnceCell to the argument value. + /// Set the value of the `OnceCell` to the given value if the `OnceCell` is + /// empty. + /// + /// If the `OnceCell` already has a value, this call will fail with an + /// [`SetError::AlreadyInitializedError`]. /// - /// If the value of the OnceCell was already set prior to this call - /// then [`SetError::AlreadyInitializedError`] is returned. If another thread - /// is initializing the cell while this method is called, - /// [`SetError::InitializingError`] is returned. In order to wait - /// for an ongoing initialization to finish, call - /// [`OnceCell::get_or_init`] instead. + /// If the `OnceCell` is empty, but some other task is currently trying to + /// set the value, this call will fail with [`SetError::InitializingError`]. /// /// [`SetError::AlreadyInitializedError`]: crate::sync::SetError::AlreadyInitializedError /// [`SetError::InitializingError`]: crate::sync::SetError::InitializingError - /// ['OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init pub fn set(&self, value: T) -> Result<(), SetError> { if !self.initialized() { - // Another thread might be initializing the cell, in which case `try_acquire` will - // return an error + // Another task might be initializing the cell, in which case + // `try_acquire` will return an error. If we succeed to acquire the + // permit, then we can set the value. match self.semaphore.try_acquire() { - Ok(_permit) => { - if !self.initialized() { - // SAFETY: There is only one permit on the semaphore, hence only one - // mutable reference is created - unsafe { self.set_value(value) }; - - return Ok(()); - } else { - unreachable!( - "acquired the permit after OnceCell value was already initialized." - ); - } + Ok(permit) => { + debug_assert!(!self.initialized()); + self.set_value(value, permit); + Ok(()) } - _ => { - // Couldn't acquire the permit, look if initializing process is already completed - if !self.initialized() { - return Err(SetError::InitializingError(value)); - } + Err(TryAcquireError::NoPermits) => { + // Some other task is holding the permit. That task is + // currently trying to initialize the value. + Err(SetError::InitializingError(value)) + } + Err(TryAcquireError::Closed) => { + // The semaphore was closed. Some other task has initialized + // the value. + Err(SetError::AlreadyInitializedError(value)) } } + } else { + Err(SetError::AlreadyInitializedError(value)) } - - Err(SetError::AlreadyInitializedError(value)) } - /// Tries to initialize the value of the OnceCell using the async function `f`. - /// If the value of the OnceCell was already initialized prior to this call, - /// a reference to that initialized value is returned. If some other thread - /// initiated the initialization prior to this call and the initialization - /// hasn't completed, this call waits until the initialization is finished. + /// Get the value currently in the `OnceCell`, or initialize it with the + /// given asynchronous operation. + /// + /// If some other task is currently working on initializing the `OnceCell`, + /// this call will wait for that other task to finish, then return the value + /// that the other task produced. + /// + /// If the provided operation is cancelled or panics, the initialization + /// attempt is cancelled. If there are other tasks waiting for the value to + /// be initialized, one of them will start another attempt at initializing + /// the value. /// - /// This will deadlock if `f` tries to initialize the cell itself. + /// This will deadlock if `f` tries to initialize the cell recursively. pub async fn get_or_init(&self, f: F) -> &T where F: FnOnce() -> Fut, Fut: Future, { if self.initialized() { - // SAFETY: once the value is initialized, no mutable references are given out, so - // we can give out arbitrarily many immutable references + // SAFETY: The OnceCell has been fully initialized. unsafe { self.get_unchecked() } } else { - // After acquire().await we have either acquired a permit while self.value - // is still uninitialized, or the current thread is awoken after another thread - // has initialized the value and closed the semaphore, in which case self.initialized - // is true and we don't set the value here + // Here we try to acquire the semaphore permit. Holding the permit + // will allow us to set the value of the OnceCell, and prevents + // other tasks from initializing the OnceCell while we are holding + // it. match self.semaphore.acquire().await { - Ok(_permit) => { - if !self.initialized() { - // If `f()` panics or `select!` is called, this `get_or_init` call - // is aborted and the semaphore permit is dropped. - let value = f().await; - - // SAFETY: There is only one permit on the semaphore, hence only one - // mutable reference is created - unsafe { self.set_value(value) }; - - // SAFETY: once the value is initialized, no mutable references are given out, so - // we can give out arbitrarily many immutable references - unsafe { self.get_unchecked() } - } else { - unreachable!("acquired semaphore after value was already initialized."); - } + Ok(permit) => { + debug_assert!(!self.initialized()); + + // If `f()` panics or `select!` is called, this + // `get_or_init` call is aborted and the semaphore permit is + // dropped. + let value = f().await; + + self.set_value(value, permit) } Err(_) => { - if self.initialized() { - // SAFETY: once the value is initialized, no mutable references are given out, so - // we can give out arbitrarily many immutable references - unsafe { self.get_unchecked() } - } else { - unreachable!( - "Semaphore closed, but the OnceCell has not been initialized." - ); - } + debug_assert!(self.initialized()); + + // SAFETY: The semaphore has been closed. This only happens + // when the OnceCell is fully initialized. + unsafe { self.get_unchecked() } } } } } - /// Tries to initialize the value of the OnceCell using the async function `f`. - /// If the value of the OnceCell was already initialized prior to this call, - /// a reference to that initialized value is returned. If some other thread - /// initiated the initialization prior to this call and the initialization - /// hasn't completed, this call waits until the initialization is finished. - /// If the function argument `f` returns an error, `get_or_try_init` - /// returns that error, otherwise the result of `f` will be stored in the cell. + /// Get the value currently in the `OnceCell`, or initialize it with the + /// given asynchronous operation. + /// + /// If some other task is currently working on initializing the `OnceCell`, + /// this call will wait for that other task to finish, then return the value + /// that the other task produced. + /// + /// If the provided operation returns an error, is cancelled or panics, the + /// initialization attempt is cancelled. If there are other tasks waiting + /// for the value to be initialized, one of them will start another attempt + /// at initializing the value. /// - /// This will deadlock if `f` tries to initialize the cell itself. + /// This will deadlock if `f` tries to initialize the cell recursively. pub async fn get_or_try_init(&self, f: F) -> Result<&T, E> where F: FnOnce() -> Fut, Fut: Future>, { if self.initialized() { - // SAFETY: once the value is initialized, no mutable references are given out, so - // we can give out arbitrarily many immutable references + // SAFETY: The OnceCell has been fully initialized. unsafe { Ok(self.get_unchecked()) } } else { - // After acquire().await we have either acquired a permit while self.value - // is still uninitialized, or the current thread is awoken after another thread - // has initialized the value and closed the semaphore, in which case self.initialized - // is true and we don't set the value here + // Here we try to acquire the semaphore permit. Holding the permit + // will allow us to set the value of the OnceCell, and prevents + // other tasks from initializing the OnceCell while we are holding + // it. match self.semaphore.acquire().await { - Ok(_permit) => { - if !self.initialized() { - // If `f()` panics or `select!` is called, this `get_or_try_init` call - // is aborted and the semaphore permit is dropped. - let value = f().await; - - match value { - Ok(value) => { - // SAFETY: There is only one permit on the semaphore, hence only one - // mutable reference is created - unsafe { self.set_value(value) }; - - // SAFETY: once the value is initialized, no mutable references are given out, so - // we can give out arbitrarily many immutable references - unsafe { Ok(self.get_unchecked()) } - } - Err(e) => Err(e), - } - } else { - unreachable!("acquired semaphore after value was already initialized."); + Ok(permit) => { + debug_assert!(!self.initialized()); + + // If `f()` panics or `select!` is called, this + // `get_or_try_init` call is aborted and the semaphore + // permit is dropped. + let value = f().await; + + match value { + Ok(value) => Ok(self.set_value(value, permit)), + Err(e) => Err(e), } } Err(_) => { - if self.initialized() { - // SAFETY: once the value is initialized, no mutable references are given out, so - // we can give out arbitrarily many immutable references - unsafe { Ok(self.get_unchecked()) } - } else { - unreachable!( - "Semaphore closed, but the OnceCell has not been initialized." - ); - } + debug_assert!(self.initialized()); + + // SAFETY: The semaphore has been closed. This only happens + // when the OnceCell is fully initialized. + unsafe { Ok(self.get_unchecked()) } } } } } - /// Moves the value out of the cell, destroying the cell in the process. - /// - /// Returns `None` if the cell is uninitialized. + /// Take the value from the cell, destroying the cell in the process. + /// Returns `None` if the cell is empty. pub fn into_inner(mut self) -> Option { - if self.initialized() { + if self.initialized_mut() { // Set to uninitialized for the destructor of `OnceCell` to work properly *self.value_set.get_mut() = false; Some(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) }) @@ -338,20 +394,18 @@ impl OnceCell { } } - /// Takes ownership of the current value, leaving the cell uninitialized. - /// - /// Returns `None` if the cell is uninitialized. + /// Takes ownership of the current value, leaving the cell empty. Returns + /// `None` if the cell is empty. pub fn take(&mut self) -> Option { std::mem::take(self).into_inner() } } -// Since `get` gives us access to immutable references of the -// OnceCell, OnceCell can only be Sync if T is Sync, otherwise -// OnceCell would allow sharing references of !Sync values across -// threads. We need T to be Send in order for OnceCell to by Sync -// because we can use `set` on `&OnceCell` to send -// values (of type T) across threads. +// Since `get` gives us access to immutable references of the OnceCell, OnceCell +// can only be Sync if T is Sync, otherwise OnceCell would allow sharing +// references of !Sync values across threads. We need T to be Send in order for +// OnceCell to by Sync because we can use `set` on `&OnceCell` to send values +// (of type T) across threads. unsafe impl Sync for OnceCell {} // Access to OnceCell's value is guarded by the semaphore permit @@ -359,20 +413,17 @@ unsafe impl Sync for OnceCell {} // it's safe to send it to another thread unsafe impl Send for OnceCell {} -/// Errors that can be returned from [`OnceCell::set`] +/// Errors that can be returned from [`OnceCell::set`]. /// /// [`OnceCell::set`]: crate::sync::OnceCell::set #[derive(Debug, PartialEq)] pub enum SetError { - /// Error resulting from [`OnceCell::set`] calls if the cell was previously initialized. + /// The cell was already initialized when [`OnceCell::set`] was called. /// /// [`OnceCell::set`]: crate::sync::OnceCell::set AlreadyInitializedError(T), - /// Error resulting from [`OnceCell::set`] calls when the cell is currently being - /// initialized during the calls to that method. - /// - /// [`OnceCell::set`]: crate::sync::OnceCell::set + /// The cell is currently being initialized. InitializingError(T), } From 57a42e91eeee9143ac18523d89acf5283412e05b Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Thu, 8 Jul 2021 15:29:05 +0200 Subject: [PATCH 2/4] Add missing .await --- tokio/src/sync/once_cell.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 14f5207c60f..5690b90cdc2 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -63,7 +63,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; /// /// #[tokio::main] /// async fn main() { -/// let result = get_global_integer(); +/// let result = get_global_integer().await; /// assert_eq!(*result, 2); /// } /// ``` From 696b2c98d371c0608ebbfeef0aa506f53298a77a Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Thu, 8 Jul 2021 16:21:01 +0200 Subject: [PATCH 3/4] Add missing .await --- tokio/src/sync/once_cell.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 5690b90cdc2..fed84ed86a6 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -167,7 +167,7 @@ impl OnceCell { /// /// #[tokio::main] /// async fn main() { - /// let result = get_global_integer(); + /// let result = get_global_integer().await; /// assert_eq!(*result, 2); /// } /// ``` From b4f8463460d644038ccbbd20c7a3bb1936d0ad58 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 12 Jul 2021 10:39:18 +0200 Subject: [PATCH 4/4] Reformat set --- tokio/src/sync/once_cell.rs | 44 ++++++++++++++++++------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index fed84ed86a6..91705a55899 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -257,29 +257,29 @@ impl OnceCell { /// [`SetError::AlreadyInitializedError`]: crate::sync::SetError::AlreadyInitializedError /// [`SetError::InitializingError`]: crate::sync::SetError::InitializingError pub fn set(&self, value: T) -> Result<(), SetError> { - if !self.initialized() { - // Another task might be initializing the cell, in which case - // `try_acquire` will return an error. If we succeed to acquire the - // permit, then we can set the value. - match self.semaphore.try_acquire() { - Ok(permit) => { - debug_assert!(!self.initialized()); - self.set_value(value, permit); - Ok(()) - } - Err(TryAcquireError::NoPermits) => { - // Some other task is holding the permit. That task is - // currently trying to initialize the value. - Err(SetError::InitializingError(value)) - } - Err(TryAcquireError::Closed) => { - // The semaphore was closed. Some other task has initialized - // the value. - Err(SetError::AlreadyInitializedError(value)) - } + if self.initialized() { + return Err(SetError::AlreadyInitializedError(value)); + } + + // Another task might be initializing the cell, in which case + // `try_acquire` will return an error. If we succeed to acquire the + // permit, then we can set the value. + match self.semaphore.try_acquire() { + Ok(permit) => { + debug_assert!(!self.initialized()); + self.set_value(value, permit); + Ok(()) + } + Err(TryAcquireError::NoPermits) => { + // Some other task is holding the permit. That task is + // currently trying to initialize the value. + Err(SetError::InitializingError(value)) + } + Err(TryAcquireError::Closed) => { + // The semaphore was closed. Some other task has initialized + // the value. + Err(SetError::AlreadyInitializedError(value)) } - } else { - Err(SetError::AlreadyInitializedError(value)) } }