From b3e521d5f583e47d85d2cc66ea47a5c1455df107 Mon Sep 17 00:00:00 2001 From: b-naber Date: Mon, 8 Mar 2021 11:29:55 +0100 Subject: [PATCH 01/15] add OnceCell --- tokio/src/sync/mod.rs | 3 + tokio/src/sync/once_cell.rs | 313 ++++++++++++++++++++++++++++++++++++ 2 files changed, 316 insertions(+) create mode 100644 tokio/src/sync/once_cell.rs diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index e1d1a83a324..9028544e984 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -461,6 +461,9 @@ cfg_sync! { mod task; pub(crate) use task::AtomicWaker; + mod once_cell; + pub use self::once_cell::{OnceCell, NotInitializedError, AlreadyInitializedError}; + pub mod watch; } diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs new file mode 100644 index 00000000000..87a40c83168 --- /dev/null +++ b/tokio/src/sync/once_cell.rs @@ -0,0 +1,313 @@ +use super::Semaphore; +use crate::loom::cell::UnsafeCell; +use std::error::Error; +use std::fmt; +use std::future::Future; +use std::mem::MaybeUninit; +use std::sync::atomic::{AtomicBool, Ordering}; + +/// A thread-safe cell which can be written to only once. +/// +/// Provides the functionality to either set the value, in case `OnceCell` +/// is uninitialized, or get the already initialized value by using an async +/// function via [`OnceCell::get_or_init_with`] or by using a Future via +/// [`OnceCell::get_or_init`] directly via [`OnceCell::get_or_init`]. +/// +/// [`OnceCell::get_or_init_with`]: crate::sync::OnceCell::get_or_init_with +/// [`OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init +/// +/// # Examples +/// ``` +/// use tokio::sync::OnceCell; +/// +/// async fn some_computation() -> u32 { +/// 1 + 1 +/// } +/// +/// static ONCE: OnceCell = OnceCell::new(); +/// +/// #[tokio::main] +/// async fn main() { +/// let result1 = tokio::spawn(async { +/// ONCE.get_or_init_with(some_computation).await +/// }).await.unwrap(); +/// assert_eq!(*result1, 2); +/// } +/// ``` +pub struct OnceCell { + value_set: AtomicBool, + value: UnsafeCell>, + semaphore: Semaphore, +} + +impl Default for OnceCell { + fn default() -> OnceCell { + OnceCell::new() + } +} + +impl fmt::Debug for OnceCell { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("OnceCell") + .field("value", &self.get()) + .finish() + } +} + +impl Clone for OnceCell { + fn clone(&self) -> OnceCell { + let new_cell = OnceCell::new(); + if let Ok(value) = self.get() { + match new_cell.set(value.clone()) { + Ok(()) => (), + Err(_) => unreachable!(), + } + } + new_cell + } +} + +impl PartialEq for OnceCell { + fn eq(&self, other: &OnceCell) -> bool { + self.get() == other.get() + } +} + +impl Eq for OnceCell {} + +impl OnceCell { + /// Creates a new uninitialized OnceCell instance. + #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] + #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] + pub const fn new() -> Self { + OnceCell { + value_set: AtomicBool::new(false), + value: UnsafeCell::new(MaybeUninit::uninit()), + semaphore: Semaphore::const_new(1), + } + } + + /// Whether the value of the OnceCell is set or not. + pub fn initialized(&self) -> bool { + self.value_set.load(Ordering::Acquire) + } + + // SAFETY: safe to call only once self.initialized() is true + unsafe fn get_unchecked(&self) -> &T { + &*self.value.with(|ptr| (*ptr).as_ptr()) + } + + // SAFETY: safe to call only once a permit on the semaphore has been + // acquired + unsafe fn set_value(&self, value: T) { + self.value.with_mut(|ptr| (*ptr).as_mut_ptr().write(value)); + self.value_set.store(true, Ordering::Release); + self.semaphore.close(); + } + + /// Tries to get a reference to the value of the OnceCell. + /// + /// Returns [`NotInitializedError`] if the value of the OnceCell + /// hasn't previously been initialized. + /// + /// [`NotInitializedError`]: crate::sync::NotInitializedError + pub fn get(&self) -> Result<&T, NotInitializedError> { + if self.initialized() { + Ok(unsafe { self.get_unchecked() }) + } else { + Err(NotInitializedError) + } + } + + /// Sets the value of the OnceCell to the argument value. + /// + /// If the value of the OnceCell was already set prior to this call + /// or some other set is currently initializing the cell, then + /// [`AlreadyInitializedError`] is returned. In order to wait + /// for an ongoing initialization to finish, call [`OnceCell::get_or_init`] + /// or [`OnceCell::get_or_init_with`] instead. + /// + /// [`AlreadyInitializedError`]: crate::sync::AlreadyInitializedError + /// ['OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init + /// ['OnceCell::get_or_init_with`]: crate::sync::OnceCell::get_or_init_with + pub fn set(&self, value: T) -> Result<(), AlreadyInitializedError> { + if !self.initialized() { + // After acquire().await we have either acquired a permit while self.value + // is still uninitialized, or another thread has intialized the value and + // closed the semaphore, in which case self.initialized is true and we + // don't set the value + match self.semaphore.try_acquire() { + Ok(_permit) => { + if !self.initialized() { + // SAFETY: There is only one permit on the semaphore, hence only one + // mutable reference is created + unsafe { self.set_value(value) }; + + return Ok(()); + } else { + unreachable!( + "acquired the permit after OnceCell value was already initialized." + ); + } + } + _ => { + if !self.initialized() { + panic!( + "couldn't acquire a permit even though OnceCell value is uninitialized." + ); + } + } + } + } + + Err(AlreadyInitializedError) + } + + /// Tries to initialize the value of the OnceCell using the async function `f`. + /// If the value of the OnceCell was already initialized prior to this call, + /// a reference to that initialized value is returned. If some other thread + /// initiated the initialization prior to this call and the initialization + /// hasn't completed, this call waits until the initialization is finished. + /// + /// This will deadlock if `f` tries to initialize the cell itself. + pub async fn get_or_init_with(&self, f: F) -> &T + where + F: FnOnce() -> Fut, + Fut: Future, + { + if self.initialized() { + // SAFETY: once the value is initialized, no mutable references are given out, so + // we can give out arbitrarily many immutable references + unsafe { self.get_unchecked() } + } else { + // After acquire().await we have either acquired a permit while self.value + // is still uninitialized, or current thread is awoken after another thread + // has intialized the value and closed the semaphore, in which case self.initialized + // is true and we don't set the value here + match self.semaphore.acquire().await { + Ok(_permit) => { + if !self.initialized() { + // If `f()` panics or `select!` is called, this `get_or_init_with` call + // is aborted and the semaphore permit is dropped. + let value = f().await; + + // SAFETY: There is only one permit on the semaphore, hence only one + // mutable reference is created + unsafe { self.set_value(value) }; + + // SAFETY: once the value is initialized, no mutable references are given out, so + // we can give out arbitrarily many immutable references + unsafe { self.get_unchecked() } + } else { + unreachable!("acquired semaphore after value was already initialized."); + } + } + Err(_) => { + if self.initialized() { + // SAFETY: once the value is initialized, no mutable references are given out, so + // we can give out arbitrarily many immutable references + unsafe { self.get_unchecked() } + } else { + unreachable!( + "Semaphore closed, but the OnceCell has not been initialized." + ); + } + } + } + } + } + + /// Tries to initialize the value of the `OnceCell` using the the Future `f`. + /// If the value of the `OnceCell` was already initialized prior to this call, + /// a reference to that initialized value is returned. If some other thread + /// initiated the initialization prior to this call and the initialization + /// hasn't completed, this call waits until the initialization is finished. + /// + /// This will deadlock if `f` internally tries to initialize the cell itself. + pub async fn get_or_init(&self, f: F) -> &T + where + F: Future, + { + if self.initialized() { + // SAFETY: once the value is initialized, no mutable references are given out, so + // we can give out arbitrarily many immutable references + return unsafe { self.get_unchecked() }; + } else { + // After acquire().await we have either acquired a permit while self.value + // is still uninitialized, or current thread is awoken after another thread + // has intialized the value and closed the semaphore, in which case self.initialized + // is true and we don't set the value here + match self.semaphore.acquire().await { + Ok(_permit) => { + if !self.initialized() { + // If `f` panics or `select!` is called, this `get_or_init` call + // is aborted and the semaphore permit is dropped. + let value = f.await; + + // SAFETY: There is only one permit on the semaphore, hence only one + // mutable reference is created + unsafe { self.set_value(value) }; + + // SAFETY: once the value is initialized, no mutable references are given out, so + // we can give out arbitrarily many immutable references + return unsafe { self.get_unchecked() }; + } else { + unreachable!("acquired semaphore after value was already initialized."); + } + } + Err(_) => { + if self.initialized() { + // SAFETY: once the value is initialized, no mutable references are given out, so + // we can give out arbitrarily many immutable references + return unsafe { self.get_unchecked() }; + } else { + unreachable!( + "Semaphore closed, but the OnceCell has not been initialized." + ); + } + } + } + } + } +} + +// Since `get` gives us access to immutable references of the +// OnceCell, OnceCell can only be Sync if T is Sync, otherwise +// OnceCell would allow sharing references of !Sync values across +// threads. We need T to be Send in order for OnceCell to by Sync +// because we can use `set` on `&OnceCell` to send +// values (of type T) across threads. +unsafe impl Sync for OnceCell {} + +// Access to OnceCell's value is guarded by the semaphore permit +// and atomic operations on `value_set`, so as long as T itself is Send +// it's safe to send it to another thread +unsafe impl Send for OnceCell {} + +/// Error returned from the [`OnceCell::set`] method +/// +/// [`OnceCell::set`]: crate::sync::OnceCell::set +#[derive(Debug, PartialEq)] +pub struct AlreadyInitializedError; + +impl fmt::Display for AlreadyInitializedError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "AlreadyInitializedError") + } +} + +impl Error for AlreadyInitializedError {} + +/// Error returned from the [`OnceCell::get`] method +/// +/// [`OnceCell::get`]: crate::sync::OnceCell::get +#[derive(Debug, PartialEq)] +pub struct NotInitializedError; + +impl fmt::Display for NotInitializedError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "NotInitializedError") + } +} + +impl Error for NotInitializedError {} From 440a5474548565029f13dbc8461bc3719c2c4e65 Mon Sep 17 00:00:00 2001 From: b-naber Date: Mon, 8 Mar 2021 11:32:53 +0100 Subject: [PATCH 02/15] add and update tests --- tokio/tests/async_send_sync.rs | 40 +++++++++++++++ tokio/tests/sync_once_cell.rs | 94 ++++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 tokio/tests/sync_once_cell.rs diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index 671fa4a707f..a18357b2f95 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -265,6 +265,46 @@ async_assert_fn!(tokio::sync::watch::Sender::closed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init_with( + _, fn() -> Pin + Send + Sync>>): Send & Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init_with( + _, fn() -> Pin + Send>>): Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init_with( + _, fn() -> Pin>>): Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( + _, fn() -> Pin> + Send + Sync>>): Send & Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( + _, fn() -> Pin> + Send>>): Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( + _, fn() -> Pin>>>): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( + _, fn() -> Pin> + Send + Sync>>): Send & Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( + _, fn() -> Pin> + Send>>): Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( + _, fn() -> Pin>>>): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init( + _, Pin + Send + Sync>>): Send & Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init( + _, Pin + Send>>): Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init( + _, Pin>>): Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, Pin> + Send + Sync>>): Send & Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, Pin> + Send>>): Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, Pin>>>): !Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, Pin> + Send + Sync>>): Send & Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, Pin> + Send>>): Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, Pin>>>): !Send & !Sync); +assert_value!(tokio::sync::OnceCell: Send & Sync); +assert_value!(tokio::sync::OnceCell>: Send & !Sync); +assert_value!(tokio::sync::OnceCell>: !Send & !Sync); + async_assert_fn!(tokio::task::LocalKey::scope(_, u32, BoxFutureSync<()>): Send & Sync); async_assert_fn!(tokio::task::LocalKey::scope(_, u32, BoxFutureSend<()>): Send & !Sync); async_assert_fn!(tokio::task::LocalKey::scope(_, u32, BoxFuture<()>): !Send & !Sync); diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs new file mode 100644 index 00000000000..b88c7ab32f4 --- /dev/null +++ b/tokio/tests/sync_once_cell.rs @@ -0,0 +1,94 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use tokio::runtime::Runtime; +use tokio::sync::{AlreadyInitializedError, NotInitializedError, OnceCell}; +use tokio::time::{sleep, Duration}; + +async fn func1() -> u32 { + 5 +} + +async fn func2() -> u32 { + sleep(Duration::from_millis(10)).await; + 10 +} + +async fn func_panic() -> u32 { + sleep(Duration::from_secs(1)).await; + panic!(); +} + +#[test] +fn get_or_init_with() { + let rt = Runtime::new().unwrap(); + + static ONCE: OnceCell = OnceCell::new(); + + rt.block_on(async { + let result1 = rt + .spawn(async { ONCE.get_or_init_with(func1).await }) + .await + .unwrap(); + + let result2 = rt + .spawn(async { ONCE.get_or_init_with(func2).await }) + .await + .unwrap(); + + assert_eq!(*result1, 5); + assert_eq!(*result2, 5); + }); +} + +#[test] +fn get_or_init_panic() { + let rt = Runtime::new().unwrap(); + + static ONCE: OnceCell = OnceCell::new(); + + rt.block_on(async { + let result1 = rt + .spawn(async { ONCE.get_or_init_with(func1).await }) + .await + .unwrap(); + + let result2 = rt + .spawn(async { ONCE.get_or_init_with(func_panic).await }) + .await + .unwrap(); + + assert_eq!(*result1, 5); + assert_eq!(*result2, 5); + }); +} + +#[test] +fn set_and_get() { + let rt = Runtime::new().unwrap(); + + static ONCE: OnceCell = OnceCell::new(); + + rt.block_on(async { + let _ = rt.spawn(async { ONCE.set(5) }).await; + let value = ONCE.get().unwrap(); + assert_eq!(*value, 5); + }); +} + +#[test] +fn get_uninit() { + static ONCE: OnceCell = OnceCell::new(); + let uninit = ONCE.get(); + assert_eq!(uninit, Err(NotInitializedError)); +} + +#[test] +fn set_twice() { + static ONCE: OnceCell = OnceCell::new(); + + let first = ONCE.set(5); + assert_eq!(first, Ok(())); + let second = ONCE.set(6); + assert_eq!(second, Err(AlreadyInitializedError)); +} From fdadf94457761ef1befcfac4084fe68390c68c0e Mon Sep 17 00:00:00 2001 From: b-naber Date: Fri, 12 Mar 2021 22:41:18 +0100 Subject: [PATCH 03/15] address review --- tokio/src/sync/mod.rs | 2 +- tokio/src/sync/once_cell.rs | 191 ++++++++++++++++++++------------- tokio/tests/async_send_sync.rs | 30 ++---- tokio/tests/sync_once_cell.rs | 111 +++++++++++++------ 4 files changed, 206 insertions(+), 128 deletions(-) diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 9028544e984..9d01d0f58ee 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -462,7 +462,7 @@ cfg_sync! { pub(crate) use task::AtomicWaker; mod once_cell; - pub use self::once_cell::{OnceCell, NotInitializedError, AlreadyInitializedError}; + pub use self::once_cell::{OnceCell, NotInitializedError, SetError}; pub mod watch; } diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 87a40c83168..c6a448e5e42 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -4,17 +4,16 @@ use std::error::Error; use std::fmt; use std::future::Future; use std::mem::MaybeUninit; +use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; /// A thread-safe cell which can be written to only once. /// /// Provides the functionality to either set the value, in case `OnceCell` /// is uninitialized, or get the already initialized value by using an async -/// function via [`OnceCell::get_or_init_with`] or by using a Future via -/// [`OnceCell::get_or_init`] directly via [`OnceCell::get_or_init`]. +/// function via [`OnceCell::get_or_init_with`]. /// /// [`OnceCell::get_or_init_with`]: crate::sync::OnceCell::get_or_init_with -/// [`OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init /// /// # Examples /// ``` @@ -24,7 +23,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; /// 1 + 1 /// } /// -/// static ONCE: OnceCell = OnceCell::new(); +/// static ONCE: OnceCell = OnceCell::const_new(); /// /// #[tokio::main] /// async fn main() { @@ -76,10 +75,19 @@ impl PartialEq for OnceCell { impl Eq for OnceCell {} impl OnceCell { + /// Creates a new uninitialized OnceCell instance. + pub fn new() -> Self { + OnceCell { + value_set: AtomicBool::new(false), + value: UnsafeCell::new(MaybeUninit::uninit()), + semaphore: Semaphore::new(1), + } + } + /// Creates a new uninitialized OnceCell instance. #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] - pub const fn new() -> Self { + pub const fn const_new() -> Self { OnceCell { value_set: AtomicBool::new(false), value: UnsafeCell::new(MaybeUninit::uninit()), @@ -97,6 +105,12 @@ impl OnceCell { &*self.value.with(|ptr| (*ptr).as_ptr()) } + // SAFETY: safe to call only once self.initialized() is true and since + // only one mutable reference can call this method + unsafe fn get_unchecked_mut(&mut self) -> &mut T { + &mut *self.value.with_mut(|ptr| (*ptr).as_mut_ptr()) + } + // SAFETY: safe to call only once a permit on the semaphore has been // acquired unsafe fn set_value(&self, value: T) { @@ -115,27 +129,38 @@ impl OnceCell { if self.initialized() { Ok(unsafe { self.get_unchecked() }) } else { - Err(NotInitializedError) + Err(NotInitializedError(())) + } + } + + /// If the cell is initialized, this method returns a mutable reference to its value, + /// otherwise returns [`NotInitializedError`]. + /// + /// [`NotInitializedError`]: crate::sync::NotInitializederror + pub fn get_mut(&mut self) -> Result<&mut T, NotInitializedError> { + if self.initialized() { + Ok(unsafe { self.get_unchecked_mut() }) + } else { + Err(NotInitializedError(())) } } /// Sets the value of the OnceCell to the argument value. /// /// If the value of the OnceCell was already set prior to this call - /// or some other set is currently initializing the cell, then - /// [`AlreadyInitializedError`] is returned. In order to wait - /// for an ongoing initialization to finish, call [`OnceCell::get_or_init`] - /// or [`OnceCell::get_or_init_with`] instead. + /// then [`AlreadyInitializedError`] is returned. If another thread + /// is initializing the cell while this method is called, + /// ['InitializingError`] is returned. In order to wait + /// for an ongoing initialization to finish, call + /// [`OnceCell::get_or_init_with`] instead. /// /// [`AlreadyInitializedError`]: crate::sync::AlreadyInitializedError - /// ['OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init + /// [`InitializingError`]: crate::sync::InitializingError /// ['OnceCell::get_or_init_with`]: crate::sync::OnceCell::get_or_init_with - pub fn set(&self, value: T) -> Result<(), AlreadyInitializedError> { + pub fn set(&self, value: T) -> Result<(), SetError> { if !self.initialized() { - // After acquire().await we have either acquired a permit while self.value - // is still uninitialized, or another thread has intialized the value and - // closed the semaphore, in which case self.initialized is true and we - // don't set the value + // Another thread might be initializing the cell, in which case `try_acquire` will + // return an error match self.semaphore.try_acquire() { Ok(_permit) => { if !self.initialized() { @@ -151,16 +176,30 @@ impl OnceCell { } } _ => { + // Couldn't acquire the permit, look if initializing process is already completed if !self.initialized() { - panic!( - "couldn't acquire a permit even though OnceCell value is uninitialized." - ); + return Err(SetError::InitializingError(())); } } } } - Err(AlreadyInitializedError) + Err(SetError::AlreadyInitializedError(())) + } + + /// Tries to set the value of the cell, overwriting the previously set value, in case one is + /// available. If no value was previously set, this method has the same functionality has + /// [`OnceCell::set`]. + /// + /// [`OnceCell::set`]: crate::sync::OnceCell::set + pub fn set_mut(&mut self, value: T) -> Result<(), SetError> { + if self.initialized() { + // SAFETY: Setting this value is safe because the mutable reference guarantees exclusivity + unsafe { self.set_value(value) }; + Ok(()) + } else { + self.set(value) + } } /// Tries to initialize the value of the OnceCell using the async function `f`. @@ -217,56 +256,25 @@ impl OnceCell { } } - /// Tries to initialize the value of the `OnceCell` using the the Future `f`. - /// If the value of the `OnceCell` was already initialized prior to this call, - /// a reference to that initialized value is returned. If some other thread - /// initiated the initialization prior to this call and the initialization - /// hasn't completed, this call waits until the initialization is finished. - /// - /// This will deadlock if `f` internally tries to initialize the cell itself. - pub async fn get_or_init(&self, f: F) -> &T - where - F: Future, - { + /// Moves the value out of the cell and drops the cell afterwards. + pub fn into_inner(self) -> Result { if self.initialized() { - // SAFETY: once the value is initialized, no mutable references are given out, so - // we can give out arbitrarily many immutable references - return unsafe { self.get_unchecked() }; + Ok(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) }) } else { - // After acquire().await we have either acquired a permit while self.value - // is still uninitialized, or current thread is awoken after another thread - // has intialized the value and closed the semaphore, in which case self.initialized - // is true and we don't set the value here - match self.semaphore.acquire().await { - Ok(_permit) => { - if !self.initialized() { - // If `f` panics or `select!` is called, this `get_or_init` call - // is aborted and the semaphore permit is dropped. - let value = f.await; - - // SAFETY: There is only one permit on the semaphore, hence only one - // mutable reference is created - unsafe { self.set_value(value) }; + Err(NotInitializedError(())) + } + } - // SAFETY: once the value is initialized, no mutable references are given out, so - // we can give out arbitrarily many immutable references - return unsafe { self.get_unchecked() }; - } else { - unreachable!("acquired semaphore after value was already initialized."); - } - } - Err(_) => { - if self.initialized() { - // SAFETY: once the value is initialized, no mutable references are given out, so - // we can give out arbitrarily many immutable references - return unsafe { self.get_unchecked() }; - } else { - unreachable!( - "Semaphore closed, but the OnceCell has not been initialized." - ); - } - } - } + /// Takes ownership of the current value, leaving the cell unitialized. + pub fn take(&mut self) -> Result { + if self.initialized() { + // Note: ptr::read does not move the value out of `self.value`. We need to use + // `self.initialized` to prevent incorrect accesses to value + let value = unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) }; + self.value_set.store(false, Ordering::Release); + Ok(value) + } else { + Err(NotInitializedError(())) } } } @@ -284,25 +292,62 @@ unsafe impl Sync for OnceCell {} // it's safe to send it to another thread unsafe impl Send for OnceCell {} -/// Error returned from the [`OnceCell::set`] method +/// Errors that can be returned from [`OnceCell::set`] and +/// [`OnceCell::set_mut`]. /// /// [`OnceCell::set`]: crate::sync::OnceCell::set +/// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut #[derive(Debug, PartialEq)] -pub struct AlreadyInitializedError; +pub enum SetError { + /// Error resulting from [`OnceCell::set`] or [`OnceCell::set_mut`] calls if + /// the cell was previously initialized. + /// + /// [`OnceCell::set`]: crate::sync::OnceCell::set + /// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut + AlreadyInitializedError(()), -impl fmt::Display for AlreadyInitializedError { + /// Error resulting from [`OnceCell::set`] or [`OnceCell::set_mut`] calls when + /// the cell is currently being inintialized during the calls to those methods. + /// + /// [`OnceCell::set`]: crate::sync::OnceCell::set + /// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut + InitializingError(()), +} + +impl fmt::Display for SetError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "AlreadyInitializedError") + match self { + SetError::AlreadyInitializedError(_) => write!(f, "AlreadyInitializedError"), + SetError::InitializingError(_) => write!(f, "InitializingError"), + } } } -impl Error for AlreadyInitializedError {} +impl Error for SetError {} + +impl SetError { + /// Whether `SetError` is `SetError::AlreadyInitializEderror` + pub fn is_already_init_err(&self) -> bool { + match self { + SetError::AlreadyInitializedError(_) => true, + SetError::InitializingError(_) => false, + } + } + + /// Whether `SetError` is `SetError::InitializingError` + pub fn is_initializing_err(&self) -> bool { + match self { + SetError::AlreadyInitializedError(_) => false, + SetError::InitializingError(_) => true, + } + } +} /// Error returned from the [`OnceCell::get`] method /// /// [`OnceCell::get`]: crate::sync::OnceCell::get #[derive(Debug, PartialEq)] -pub struct NotInitializedError; +pub struct NotInitializedError(()); impl fmt::Display for NotInitializedError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index a18357b2f95..49d4a218048 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -2,8 +2,10 @@ #![cfg(feature = "full")] use std::cell::Cell; +use std::future::Future; use std::io::{Cursor, SeekFrom}; use std::net::SocketAddr; +use std::pin::Pin; use std::rc::Rc; use tokio::net::TcpStream; use tokio::time::{Duration, Instant}; @@ -270,37 +272,19 @@ async_assert_fn!(tokio::sync::OnceCell::get_or_init_with( async_assert_fn!(tokio::sync::OnceCell::get_or_init_with( _, fn() -> Pin + Send>>): Send & !Sync); async_assert_fn!(tokio::sync::OnceCell::get_or_init_with( - _, fn() -> Pin>>): Send & !Sync); + _, fn() -> Pin>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( - _, fn() -> Pin> + Send + Sync>>): Send & Sync); + _, fn() -> Pin> + Send + Sync>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( - _, fn() -> Pin> + Send>>): Send & !Sync); + _, fn() -> Pin> + Send>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( _, fn() -> Pin>>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( - _, fn() -> Pin> + Send + Sync>>): Send & Sync); + _, fn() -> Pin> + Send + Sync>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( - _, fn() -> Pin> + Send>>): Send & !Sync); + _, fn() -> Pin> + Send>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( _, fn() -> Pin>>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell::get_or_init( - _, Pin + Send + Sync>>): Send & Sync); -async_assert_fn!(tokio::sync::OnceCell::get_or_init( - _, Pin + Send>>): Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell::get_or_init( - _, Pin>>): Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, Pin> + Send + Sync>>): Send & Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, Pin> + Send>>): Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, Pin>>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, Pin> + Send + Sync>>): Send & Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, Pin> + Send>>): Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, Pin>>>): !Send & !Sync); assert_value!(tokio::sync::OnceCell: Send & Sync); assert_value!(tokio::sync::OnceCell>: Send & !Sync); assert_value!(tokio::sync::OnceCell>: !Send & !Sync); diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index b88c7ab32f4..cb4dc29181b 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -1,40 +1,58 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::runtime::Runtime; -use tokio::sync::{AlreadyInitializedError, NotInitializedError, OnceCell}; -use tokio::time::{sleep, Duration}; +use std::time::Duration; +use tokio::runtime; +use tokio::sync::{OnceCell, SetError}; +use tokio::time; async fn func1() -> u32 { 5 } async fn func2() -> u32 { - sleep(Duration::from_millis(10)).await; + time::sleep(Duration::from_millis(1)).await; 10 } async fn func_panic() -> u32 { - sleep(Duration::from_secs(1)).await; + time::sleep(Duration::from_millis(1)).await; panic!(); } +async fn sleep_and_set() -> u32 { + // Simulate sleep by pausing time and waiting for another thread to + // resume clock when calling `set`, then finding the cell being initialized + // by this call + time::sleep(Duration::from_millis(2)).await; + 5 +} + +async fn advance_time_and_set(cell: &'static OnceCell, v: u32) -> Result<(), SetError> { + time::advance(Duration::from_millis(1)).await; + cell.set(v) +} + #[test] fn get_or_init_with() { - let rt = Runtime::new().unwrap(); + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); - static ONCE: OnceCell = OnceCell::new(); + static ONCE: OnceCell = OnceCell::const_new(); rt.block_on(async { - let result1 = rt - .spawn(async { ONCE.get_or_init_with(func1).await }) - .await - .unwrap(); + time::pause(); + + let handle1 = rt.spawn(async { ONCE.get_or_init_with(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init_with(func2).await }); - let result2 = rt - .spawn(async { ONCE.get_or_init_with(func2).await }) - .await - .unwrap(); + time::advance(Duration::from_millis(1)).await; + time::resume(); + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); assert_eq!(*result1, 5); assert_eq!(*result2, 5); @@ -43,20 +61,23 @@ fn get_or_init_with() { #[test] fn get_or_init_panic() { - let rt = Runtime::new().unwrap(); + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); - static ONCE: OnceCell = OnceCell::new(); + static ONCE: OnceCell = OnceCell::const_new(); rt.block_on(async { - let result1 = rt - .spawn(async { ONCE.get_or_init_with(func1).await }) - .await - .unwrap(); + time::pause(); + + let handle1 = rt.spawn(async { ONCE.get_or_init_with(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init_with(func_panic).await }); + + time::advance(Duration::from_millis(1)).await; - let result2 = rt - .spawn(async { ONCE.get_or_init_with(func_panic).await }) - .await - .unwrap(); + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); assert_eq!(*result1, 5); assert_eq!(*result2, 5); @@ -65,9 +86,12 @@ fn get_or_init_panic() { #[test] fn set_and_get() { - let rt = Runtime::new().unwrap(); + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); - static ONCE: OnceCell = OnceCell::new(); + static ONCE: OnceCell = OnceCell::const_new(); rt.block_on(async { let _ = rt.spawn(async { ONCE.set(5) }).await; @@ -78,17 +102,42 @@ fn set_and_get() { #[test] fn get_uninit() { - static ONCE: OnceCell = OnceCell::new(); + static ONCE: OnceCell = OnceCell::const_new(); let uninit = ONCE.get(); - assert_eq!(uninit, Err(NotInitializedError)); + assert!(uninit.is_err()); } #[test] fn set_twice() { - static ONCE: OnceCell = OnceCell::new(); + static ONCE: OnceCell = OnceCell::const_new(); let first = ONCE.set(5); assert_eq!(first, Ok(())); let second = ONCE.set(6); - assert_eq!(second, Err(AlreadyInitializedError)); + assert!(second.err().unwrap().is_already_init_err()); +} + +#[test] +fn set_while_initializing() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + time::pause(); + + let handle1 = rt.spawn(async { ONCE.get_or_init_with(sleep_and_set).await }); + let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await }); + + time::advance(Duration::from_millis(2)).await; + + let result1 = handle1.await.unwrap(); + let result2 = handle2.await.unwrap(); + + assert_eq!(*result1, 5); + assert!(result2.err().unwrap().is_initializing_err()); + }); } From 02dcffc7605dfe49bc0838067fd603eb61e34d59 Mon Sep 17 00:00:00 2001 From: b-naber Date: Wed, 17 Mar 2021 18:36:50 +0100 Subject: [PATCH 04/15] address review --- tokio/src/sync/once_cell.rs | 77 +++++++++++++--------------------- tokio/tests/async_send_sync.rs | 42 +++++++++++-------- tokio/tests/sync_once_cell.rs | 16 +++---- 3 files changed, 61 insertions(+), 74 deletions(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index c6a448e5e42..347e8691d6b 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -11,9 +11,9 @@ use std::sync::atomic::{AtomicBool, Ordering}; /// /// Provides the functionality to either set the value, in case `OnceCell` /// is uninitialized, or get the already initialized value by using an async -/// function via [`OnceCell::get_or_init_with`]. +/// function via [`OnceCell::get_or_init`]. /// -/// [`OnceCell::get_or_init_with`]: crate::sync::OnceCell::get_or_init_with +/// [`OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init /// /// # Examples /// ``` @@ -28,7 +28,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; /// #[tokio::main] /// async fn main() { /// let result1 = tokio::spawn(async { -/// ONCE.get_or_init_with(some_computation).await +/// ONCE.get_or_init(some_computation).await /// }).await.unwrap(); /// assert_eq!(*result1, 2); /// } @@ -56,7 +56,7 @@ impl fmt::Debug for OnceCell { impl Clone for OnceCell { fn clone(&self) -> OnceCell { let new_cell = OnceCell::new(); - if let Ok(value) = self.get() { + if let Some(value) = self.get() { match new_cell.set(value.clone()) { Ok(()) => (), Err(_) => unreachable!(), @@ -105,8 +105,8 @@ impl OnceCell { &*self.value.with(|ptr| (*ptr).as_ptr()) } - // SAFETY: safe to call only once self.initialized() is true and since - // only one mutable reference can call this method + // SAFETY: safe to call only once self.initialized() is true. Safe because + // because of the mutable reference. unsafe fn get_unchecked_mut(&mut self) -> &mut T { &mut *self.value.with_mut(|ptr| (*ptr).as_mut_ptr()) } @@ -121,27 +121,23 @@ impl OnceCell { /// Tries to get a reference to the value of the OnceCell. /// - /// Returns [`NotInitializedError`] if the value of the OnceCell - /// hasn't previously been initialized. - /// - /// [`NotInitializedError`]: crate::sync::NotInitializedError - pub fn get(&self) -> Result<&T, NotInitializedError> { + /// Returns None if the value of the OnceCell hasn't previously been initialized. + pub fn get(&self) -> Option<&T> { if self.initialized() { - Ok(unsafe { self.get_unchecked() }) + Some(unsafe { self.get_unchecked() }) } else { - Err(NotInitializedError(())) + None } } - /// If the cell is initialized, this method returns a mutable reference to its value, - /// otherwise returns [`NotInitializedError`]. + /// Tries to return a mutable reference to the value of the cell. /// - /// [`NotInitializedError`]: crate::sync::NotInitializederror - pub fn get_mut(&mut self) -> Result<&mut T, NotInitializedError> { + /// Returns None if the cell hasn't previously been initialized. + pub fn get_mut(&mut self) -> Option<&mut T> { if self.initialized() { - Ok(unsafe { self.get_unchecked_mut() }) + Some(unsafe { self.get_unchecked_mut() }) } else { - Err(NotInitializedError(())) + None } } @@ -152,12 +148,12 @@ impl OnceCell { /// is initializing the cell while this method is called, /// ['InitializingError`] is returned. In order to wait /// for an ongoing initialization to finish, call - /// [`OnceCell::get_or_init_with`] instead. + /// [`OnceCell::get_or_init`] instead. /// /// [`AlreadyInitializedError`]: crate::sync::AlreadyInitializedError /// [`InitializingError`]: crate::sync::InitializingError - /// ['OnceCell::get_or_init_with`]: crate::sync::OnceCell::get_or_init_with - pub fn set(&self, value: T) -> Result<(), SetError> { + /// ['OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init + pub fn set(&self, value: T) -> Result<(), SetError> { if !self.initialized() { // Another thread might be initializing the cell, in which case `try_acquire` will // return an error @@ -178,28 +174,13 @@ impl OnceCell { _ => { // Couldn't acquire the permit, look if initializing process is already completed if !self.initialized() { - return Err(SetError::InitializingError(())); + return Err(SetError::InitializingError(value)); } } } } - Err(SetError::AlreadyInitializedError(())) - } - - /// Tries to set the value of the cell, overwriting the previously set value, in case one is - /// available. If no value was previously set, this method has the same functionality has - /// [`OnceCell::set`]. - /// - /// [`OnceCell::set`]: crate::sync::OnceCell::set - pub fn set_mut(&mut self, value: T) -> Result<(), SetError> { - if self.initialized() { - // SAFETY: Setting this value is safe because the mutable reference guarantees exclusivity - unsafe { self.set_value(value) }; - Ok(()) - } else { - self.set(value) - } + Err(SetError::AlreadyInitializedError(value)) } /// Tries to initialize the value of the OnceCell using the async function `f`. @@ -209,7 +190,7 @@ impl OnceCell { /// hasn't completed, this call waits until the initialization is finished. /// /// This will deadlock if `f` tries to initialize the cell itself. - pub async fn get_or_init_with(&self, f: F) -> &T + pub async fn get_or_init(&self, f: F) -> &T where F: FnOnce() -> Fut, Fut: Future, @@ -226,7 +207,7 @@ impl OnceCell { match self.semaphore.acquire().await { Ok(_permit) => { if !self.initialized() { - // If `f()` panics or `select!` is called, this `get_or_init_with` call + // If `f()` panics or `select!` is called, this `get_or_init` call // is aborted and the semaphore permit is dropped. let value = f().await; @@ -271,7 +252,7 @@ impl OnceCell { // Note: ptr::read does not move the value out of `self.value`. We need to use // `self.initialized` to prevent incorrect accesses to value let value = unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) }; - self.value_set.store(false, Ordering::Release); + *self.value_set.get_mut() = false; Ok(value) } else { Err(NotInitializedError(())) @@ -298,23 +279,23 @@ unsafe impl Send for OnceCell {} /// [`OnceCell::set`]: crate::sync::OnceCell::set /// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut #[derive(Debug, PartialEq)] -pub enum SetError { +pub enum SetError { /// Error resulting from [`OnceCell::set`] or [`OnceCell::set_mut`] calls if /// the cell was previously initialized. /// /// [`OnceCell::set`]: crate::sync::OnceCell::set /// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut - AlreadyInitializedError(()), + AlreadyInitializedError(T), /// Error resulting from [`OnceCell::set`] or [`OnceCell::set_mut`] calls when /// the cell is currently being inintialized during the calls to those methods. /// /// [`OnceCell::set`]: crate::sync::OnceCell::set /// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut - InitializingError(()), + InitializingError(T), } -impl fmt::Display for SetError { +impl fmt::Display for SetError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { SetError::AlreadyInitializedError(_) => write!(f, "AlreadyInitializedError"), @@ -323,9 +304,9 @@ impl fmt::Display for SetError { } } -impl Error for SetError {} +impl Error for SetError {} -impl SetError { +impl SetError { /// Whether `SetError` is `SetError::AlreadyInitializEderror` pub fn is_already_init_err(&self) -> bool { match self { diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index 49d4a218048..d373dbb208c 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -267,24 +267,30 @@ async_assert_fn!(tokio::sync::watch::Sender::closed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell::get_or_init_with( - _, fn() -> Pin + Send + Sync>>): Send & Sync); -async_assert_fn!(tokio::sync::OnceCell::get_or_init_with( - _, fn() -> Pin + Send>>): Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell::get_or_init_with( - _, fn() -> Pin>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( - _, fn() -> Pin> + Send + Sync>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( - _, fn() -> Pin> + Send>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( - _, fn() -> Pin>>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( - _, fn() -> Pin> + Send + Sync>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( - _, fn() -> Pin> + Send>>): !Send & !Sync); -async_assert_fn!(tokio::sync::OnceCell>::get_or_init_with( - _, fn() -> Pin>>>): !Send & !Sync); +type BoxedFutureSendSync = Box + Send + Sync>; +async_assert_fn!(tokio::sync::OnceCell::get_or_init(_, fn() -> Pin): Send & Sync); +type BoxedFutureSend = Box + Send>; +async_assert_fn!(tokio::sync::OnceCell::get_or_init(_, fn() -> Pin): Send & !Sync); +type BoxedFuture = Box>; +async_assert_fn!(tokio::sync::OnceCell::get_or_init(_, fn() -> Pin): !Send & !Sync); +type BoxedCellFutureSendSync = Box> + Send + Sync>; +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin): !Send & !Sync); +type BoxedCellFutureSend = Box> + Send>; +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin): !Send & !Sync); +type BoxedCellFuture = Box>>; +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin): !Send & !Sync); +type BoxedRcFutureSendSync = Box> + Send + Sync>; +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin): !Send & !Sync); +type BoxedRcFutureSend = Box> + Send>; +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin): !Send & !Sync); +type BoxedRcFuture = Box>>; +async_assert_fn!(tokio::sync::OnceCell>::get_or_init( + _, fn() -> Pin): !Send & !Sync); assert_value!(tokio::sync::OnceCell: Send & Sync); assert_value!(tokio::sync::OnceCell>: Send & !Sync); assert_value!(tokio::sync::OnceCell>: !Send & !Sync); diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index cb4dc29181b..ae62a73a5bc 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -28,13 +28,13 @@ async fn sleep_and_set() -> u32 { 5 } -async fn advance_time_and_set(cell: &'static OnceCell, v: u32) -> Result<(), SetError> { +async fn advance_time_and_set(cell: &'static OnceCell, v: u32) -> Result<(), SetError> { time::advance(Duration::from_millis(1)).await; cell.set(v) } #[test] -fn get_or_init_with() { +fn get_or_init() { let rt = runtime::Builder::new_current_thread() .enable_time() .build() @@ -45,8 +45,8 @@ fn get_or_init_with() { rt.block_on(async { time::pause(); - let handle1 = rt.spawn(async { ONCE.get_or_init_with(func1).await }); - let handle2 = rt.spawn(async { ONCE.get_or_init_with(func2).await }); + let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); time::advance(Duration::from_millis(1)).await; time::resume(); @@ -71,8 +71,8 @@ fn get_or_init_panic() { rt.block_on(async { time::pause(); - let handle1 = rt.spawn(async { ONCE.get_or_init_with(func1).await }); - let handle2 = rt.spawn(async { ONCE.get_or_init_with(func_panic).await }); + let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); + let handle2 = rt.spawn(async { ONCE.get_or_init(func_panic).await }); time::advance(Duration::from_millis(1)).await; @@ -104,7 +104,7 @@ fn set_and_get() { fn get_uninit() { static ONCE: OnceCell = OnceCell::const_new(); let uninit = ONCE.get(); - assert!(uninit.is_err()); + assert!(uninit.is_none()); } #[test] @@ -129,7 +129,7 @@ fn set_while_initializing() { rt.block_on(async { time::pause(); - let handle1 = rt.spawn(async { ONCE.get_or_init_with(sleep_and_set).await }); + let handle1 = rt.spawn(async { ONCE.get_or_init(sleep_and_set).await }); let handle2 = rt.spawn(async { advance_time_and_set(&ONCE, 10).await }); time::advance(Duration::from_millis(2)).await; From c539d6810bc5cb6d85673b2eba0eba058067bf75 Mon Sep 17 00:00:00 2001 From: b-naber Date: Sun, 21 Mar 2021 21:49:15 +0100 Subject: [PATCH 05/15] fix clippy warnings --- tokio/tests/async_send_sync.rs | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/tokio/tests/async_send_sync.rs b/tokio/tests/async_send_sync.rs index d373dbb208c..211c572cf2b 100644 --- a/tokio/tests/async_send_sync.rs +++ b/tokio/tests/async_send_sync.rs @@ -1,5 +1,6 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +#![allow(clippy::type_complexity)] use std::cell::Cell; use std::future::Future; @@ -267,30 +268,24 @@ async_assert_fn!(tokio::sync::watch::Sender::closed(_): Send & Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); async_assert_fn!(tokio::sync::watch::Sender>::closed(_): !Send & !Sync); -type BoxedFutureSendSync = Box + Send + Sync>; -async_assert_fn!(tokio::sync::OnceCell::get_or_init(_, fn() -> Pin): Send & Sync); -type BoxedFutureSend = Box + Send>; -async_assert_fn!(tokio::sync::OnceCell::get_or_init(_, fn() -> Pin): Send & !Sync); -type BoxedFuture = Box>; -async_assert_fn!(tokio::sync::OnceCell::get_or_init(_, fn() -> Pin): !Send & !Sync); -type BoxedCellFutureSendSync = Box> + Send + Sync>; +async_assert_fn!(tokio::sync::OnceCell::get_or_init( + _, fn() -> Pin + Send + Sync>>): Send & Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init( + _, fn() -> Pin + Send>>): Send & !Sync); +async_assert_fn!(tokio::sync::OnceCell::get_or_init( + _, fn() -> Pin>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, fn() -> Pin): !Send & !Sync); -type BoxedCellFutureSend = Box> + Send>; + _, fn() -> Pin> + Send + Sync>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, fn() -> Pin): !Send & !Sync); -type BoxedCellFuture = Box>>; + _, fn() -> Pin> + Send>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, fn() -> Pin): !Send & !Sync); -type BoxedRcFutureSendSync = Box> + Send + Sync>; + _, fn() -> Pin>>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, fn() -> Pin): !Send & !Sync); -type BoxedRcFutureSend = Box> + Send>; + _, fn() -> Pin> + Send + Sync>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, fn() -> Pin): !Send & !Sync); -type BoxedRcFuture = Box>>; + _, fn() -> Pin> + Send>>): !Send & !Sync); async_assert_fn!(tokio::sync::OnceCell>::get_or_init( - _, fn() -> Pin): !Send & !Sync); + _, fn() -> Pin>>>): !Send & !Sync); assert_value!(tokio::sync::OnceCell: Send & Sync); assert_value!(tokio::sync::OnceCell>: Send & !Sync); assert_value!(tokio::sync::OnceCell>: !Send & !Sync); From 9940f2ce5c01b3515e3f6a3edb8707a33195cea0 Mon Sep 17 00:00:00 2001 From: b-naber Date: Sun, 21 Mar 2021 23:04:27 +0100 Subject: [PATCH 06/15] fix docs errors --- tokio/src/sync/once_cell.rs | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 347e8691d6b..491c774d2ea 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -144,14 +144,14 @@ impl OnceCell { /// Sets the value of the OnceCell to the argument value. /// /// If the value of the OnceCell was already set prior to this call - /// then [`AlreadyInitializedError`] is returned. If another thread + /// then [`SetError::AlreadyInitializedError`] is returned. If another thread /// is initializing the cell while this method is called, - /// ['InitializingError`] is returned. In order to wait + /// ['SetError::InitializingError`] is returned. In order to wait /// for an ongoing initialization to finish, call /// [`OnceCell::get_or_init`] instead. /// - /// [`AlreadyInitializedError`]: crate::sync::AlreadyInitializedError - /// [`InitializingError`]: crate::sync::InitializingError + /// [`SetError::AlreadyInitializedError`]: crate::sync::SetError::AlreadyInitializedError + /// [`SetError::InitializingError`]: crate::sync::SetError::InitializingError /// ['OnceCell::get_or_init`]: crate::sync::OnceCell::get_or_init pub fn set(&self, value: T) -> Result<(), SetError> { if !self.initialized() { @@ -273,25 +273,20 @@ unsafe impl Sync for OnceCell {} // it's safe to send it to another thread unsafe impl Send for OnceCell {} -/// Errors that can be returned from [`OnceCell::set`] and -/// [`OnceCell::set_mut`]. +/// Errors that can be returned from [`OnceCell::set`] /// /// [`OnceCell::set`]: crate::sync::OnceCell::set -/// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut #[derive(Debug, PartialEq)] pub enum SetError { - /// Error resulting from [`OnceCell::set`] or [`OnceCell::set_mut`] calls if - /// the cell was previously initialized. + /// Error resulting from [`OnceCell::set`] calls if the cell was previously initialized. /// /// [`OnceCell::set`]: crate::sync::OnceCell::set - /// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut AlreadyInitializedError(T), - /// Error resulting from [`OnceCell::set`] or [`OnceCell::set_mut`] calls when - /// the cell is currently being inintialized during the calls to those methods. + /// Error resulting from [`OnceCell::set`] calls when the cell is currently being + /// inintialized during the calls to that method. /// /// [`OnceCell::set`]: crate::sync::OnceCell::set - /// [`OnceCell::set_mut`]: crate::sync::OnceCell::set_mut InitializingError(T), } From 3ca002f7dc330cf65ec14b89a39577b1638daad8 Mon Sep 17 00:00:00 2001 From: b-naber Date: Mon, 22 Mar 2021 12:46:23 +0100 Subject: [PATCH 07/15] address review --- tokio/src/sync/mod.rs | 2 +- tokio/src/sync/once_cell.rs | 66 ++++++++++++++++++------------------- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/tokio/src/sync/mod.rs b/tokio/src/sync/mod.rs index 9d01d0f58ee..d89a9ddce3a 100644 --- a/tokio/src/sync/mod.rs +++ b/tokio/src/sync/mod.rs @@ -462,7 +462,7 @@ cfg_sync! { pub(crate) use task::AtomicWaker; mod once_cell; - pub use self::once_cell::{OnceCell, NotInitializedError, SetError}; + pub use self::once_cell::{OnceCell, SetError}; pub mod watch; } diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 491c774d2ea..4ca12ef303f 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -55,14 +55,7 @@ impl fmt::Debug for OnceCell { impl Clone for OnceCell { fn clone(&self) -> OnceCell { - let new_cell = OnceCell::new(); - if let Some(value) = self.get() { - match new_cell.set(value.clone()) { - Ok(()) => (), - Err(_) => unreachable!(), - } - } - new_cell + OnceCell::new_with(self.get().map(|v| (*v).clone())) } } @@ -84,6 +77,26 @@ impl OnceCell { } } + /// Creates a new initialized OnceCell instance if `value` is `Some`, otherwise + /// has the same functionality as [`OnceCell::new`]. + /// + /// [`OnceCell::new`]: crate::sync::OnceCell::new + pub fn new_with(value: Option) -> Self { + let (value_set, value) = if let Some(v) = value { + (AtomicBool::new(true), UnsafeCell::new(MaybeUninit::new(v))) + } else { + ( + AtomicBool::new(false), + UnsafeCell::new(MaybeUninit::uninit()), + ) + }; + OnceCell { + value_set, + value, + semaphore: Semaphore::new(1), + } + } + /// Creates a new uninitialized OnceCell instance. #[cfg(all(feature = "parking_lot", not(all(loom, test)),))] #[cfg_attr(docsrs, doc(cfg(feature = "parking_lot")))] @@ -238,24 +251,25 @@ impl OnceCell { } /// Moves the value out of the cell and drops the cell afterwards. - pub fn into_inner(self) -> Result { + /// + /// Returns `None` if the cell is uninitialized. + pub fn into_inner(self) -> Option { if self.initialized() { - Ok(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) }) + Some(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) }) } else { - Err(NotInitializedError(())) + None } } - /// Takes ownership of the current value, leaving the cell unitialized. - pub fn take(&mut self) -> Result { + /// Takes ownership of the current value, leaving the cell uninitialized. + /// + /// Returns `None` if the cell is uninitialized. + pub fn take(&mut self) -> Option { if self.initialized() { - // Note: ptr::read does not move the value out of `self.value`. We need to use - // `self.initialized` to prevent incorrect accesses to value - let value = unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) }; - *self.value_set.get_mut() = false; - Ok(value) + let old_me = std::mem::replace(self, OnceCell::new()); + old_me.into_inner() } else { - Err(NotInitializedError(())) + None } } } @@ -318,17 +332,3 @@ impl SetError { } } } - -/// Error returned from the [`OnceCell::get`] method -/// -/// [`OnceCell::get`]: crate::sync::OnceCell::get -#[derive(Debug, PartialEq)] -pub struct NotInitializedError(()); - -impl fmt::Display for NotInitializedError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "NotInitializedError") - } -} - -impl Error for NotInitializedError {} From b76ee23ff82b7cd2ff1d1025882b88890d62e3b6 Mon Sep 17 00:00:00 2001 From: b-naber Date: Tue, 23 Mar 2021 23:02:17 +0100 Subject: [PATCH 08/15] address review --- tokio/src/sync/once_cell.rs | 29 +++++++++-------------- tokio/tests/sync_once_cell.rs | 44 +++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 18 deletions(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 4ca12ef303f..415c49ea21a 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -55,7 +55,7 @@ impl fmt::Debug for OnceCell { impl Clone for OnceCell { fn clone(&self) -> OnceCell { - OnceCell::new_with(self.get().map(|v| (*v).clone())) + OnceCell::new_with(self.get().cloned()) } } @@ -82,18 +82,16 @@ impl OnceCell { /// /// [`OnceCell::new`]: crate::sync::OnceCell::new pub fn new_with(value: Option) -> Self { - let (value_set, value) = if let Some(v) = value { - (AtomicBool::new(true), UnsafeCell::new(MaybeUninit::new(v))) + if let Some(v) = value { + let semaphore = Semaphore::new(0); + semaphore.close(); + OnceCell { + value_set: AtomicBool::new(true), + value: UnsafeCell::new(MaybeUninit::new(v)), + semaphore, + } } else { - ( - AtomicBool::new(false), - UnsafeCell::new(MaybeUninit::uninit()), - ) - }; - OnceCell { - value_set, - value, - semaphore: Semaphore::new(1), + OnceCell::new() } } @@ -265,12 +263,7 @@ impl OnceCell { /// /// Returns `None` if the cell is uninitialized. pub fn take(&mut self) -> Option { - if self.initialized() { - let old_me = std::mem::replace(self, OnceCell::new()); - old_me.into_inner() - } else { - None - } + std::mem::take(self).into_inner() } } diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index ae62a73a5bc..c18f8f0f568 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -1,6 +1,8 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::ops::Drop; +use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; use tokio::runtime; use tokio::sync::{OnceCell, SetError}; @@ -141,3 +143,45 @@ fn set_while_initializing() { assert!(result2.err().unwrap().is_initializing_err()); }); } + +#[test] +fn drop_cell() { + static NUM_DROPS: AtomicU32 = AtomicU32::new(0); + + struct Foo {} + + let fooer = Foo {}; + + impl Drop for Foo { + fn drop(&mut self) { + NUM_DROPS.fetch_add(1, Ordering::Release); + } + } + + { + let once_cell = OnceCell::new(); + let _ = once_cell.set(fooer); + } + assert!(NUM_DROPS.load(Ordering::Acquire) == 1); +} + +#[test] +fn drop_into_inner() { + static NUM_DROPS: AtomicU32 = AtomicU32::new(0); + + struct Foo {} + + let fooer = Foo {}; + + impl Drop for Foo { + fn drop(&mut self) { + NUM_DROPS.fetch_add(1, Ordering::Release); + } + } + + let once_cell = OnceCell::new(); + let _ = once_cell.set(fooer); + let _ = once_cell.into_inner(); + let count = NUM_DROPS.load(Ordering::Acquire); + assert!(count == 0); +} From 1f8753a38c4bac516429ac9b6b7051a726edde42 Mon Sep 17 00:00:00 2001 From: b-naber Date: Wed, 24 Mar 2021 20:31:00 +0100 Subject: [PATCH 09/15] update --- tokio/tests/sync_once_cell.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index c18f8f0f568..4d8ea5075be 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -181,7 +181,7 @@ fn drop_into_inner() { let once_cell = OnceCell::new(); let _ = once_cell.set(fooer); - let _ = once_cell.into_inner(); + let v = once_cell.into_inner(); let count = NUM_DROPS.load(Ordering::Acquire); assert!(count == 0); } From 7b1be1a758ad5f91ada09bd76ef75251bf1e3074 Mon Sep 17 00:00:00 2001 From: b-naber Date: Wed, 24 Mar 2021 23:42:07 +0100 Subject: [PATCH 10/15] add destructor --- tokio/src/sync/once_cell.rs | 14 ++++++++++++++ tokio/tests/sync_once_cell.rs | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 415c49ea21a..858267b783d 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -4,6 +4,7 @@ use std::error::Error; use std::fmt; use std::future::Future; use std::mem::MaybeUninit; +use std::ops::Drop; use std::ptr; use std::sync::atomic::{AtomicBool, Ordering}; @@ -67,6 +68,17 @@ impl PartialEq for OnceCell { impl Eq for OnceCell {} +impl Drop for OnceCell { + fn drop(&mut self) { + if self.initialized() { + unsafe { + self.value + .with_mut(|ptr| ptr::drop_in_place((&mut *ptr).as_mut_ptr())); + }; + } + } +} + impl OnceCell { /// Creates a new uninitialized OnceCell instance. pub fn new() -> Self { @@ -253,6 +265,8 @@ impl OnceCell { /// Returns `None` if the cell is uninitialized. pub fn into_inner(self) -> Option { if self.initialized() { + // Set to uninitialized for the destructor of `OnceCell` to work properly + self.value_set.store(false, Ordering::Release); Some(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) }) } else { None diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index 4d8ea5075be..de950446609 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -181,7 +181,7 @@ fn drop_into_inner() { let once_cell = OnceCell::new(); let _ = once_cell.set(fooer); - let v = once_cell.into_inner(); + let _v = once_cell.into_inner(); let count = NUM_DROPS.load(Ordering::Acquire); assert!(count == 0); } From 741b508a1511ba9759c3cdecef4f2386aa7d8325 Mon Sep 17 00:00:00 2001 From: b-naber Date: Thu, 25 Mar 2021 15:32:05 +0100 Subject: [PATCH 11/15] suggestions --- tokio/src/sync/once_cell.rs | 4 ++-- tokio/tests/sync_once_cell.rs | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 858267b783d..a8ae0a61763 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -263,10 +263,10 @@ impl OnceCell { /// Moves the value out of the cell and drops the cell afterwards. /// /// Returns `None` if the cell is uninitialized. - pub fn into_inner(self) -> Option { + pub fn into_inner(mut self) -> Option { if self.initialized() { // Set to uninitialized for the destructor of `OnceCell` to work properly - self.value_set.store(false, Ordering::Release); + *self.value_set.get_mut() = false; Some(unsafe { self.value.with(|ptr| ptr::read(ptr).assume_init()) }) } else { None diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index de950446609..dc7153557e3 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -1,6 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use std::mem; use std::ops::Drop; use std::sync::atomic::{AtomicU32, Ordering}; use std::time::Duration; @@ -181,7 +182,10 @@ fn drop_into_inner() { let once_cell = OnceCell::new(); let _ = once_cell.set(fooer); - let _v = once_cell.into_inner(); + let fooer = once_cell.into_inner(); let count = NUM_DROPS.load(Ordering::Acquire); assert!(count == 0); + mem::drop(fooer); + let count = NUM_DROPS.load(Ordering::Acquire); + assert!(count == 1); } From cdc3edcd6d51e876cf316e447aa7a2d43136565e Mon Sep 17 00:00:00 2001 From: b-naber Date: Wed, 31 Mar 2021 22:35:58 +0200 Subject: [PATCH 12/15] suggestions --- tokio/src/sync/once_cell.rs | 8 +++--- tokio/tests/sync_once_cell.rs | 49 ++++++++++++++++++++++++++++++++--- 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index a8ae0a61763..f4234d8a0af 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -28,9 +28,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; /// /// #[tokio::main] /// async fn main() { -/// let result1 = tokio::spawn(async { -/// ONCE.get_or_init(some_computation).await -/// }).await.unwrap(); +/// let result1 = ONCE.get_or_init(some_computation).await /// assert_eq!(*result1, 2); /// } /// ``` @@ -260,7 +258,7 @@ impl OnceCell { } } - /// Moves the value out of the cell and drops the cell afterwards. + /// Moves the value out of the cell, destroying the cell in the process. /// /// Returns `None` if the cell is uninitialized. pub fn into_inner(mut self) -> Option { @@ -323,7 +321,7 @@ impl fmt::Display for SetError { impl Error for SetError {} impl SetError { - /// Whether `SetError` is `SetError::AlreadyInitializEderror` + /// Whether `SetError` is `SetError::AlreadyInitializedError`. pub fn is_already_init_err(&self) -> bool { match self { SetError::AlreadyInitializedError(_) => true, diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index dc7153557e3..0fae73bc66d 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -40,14 +40,13 @@ async fn advance_time_and_set(cell: &'static OnceCell, v: u32) -> Result<() fn get_or_init() { let rt = runtime::Builder::new_current_thread() .enable_time() + .start_paused(true) .build() .unwrap(); static ONCE: OnceCell = OnceCell::const_new(); rt.block_on(async { - time::pause(); - let handle1 = rt.spawn(async { ONCE.get_or_init(func1).await }); let handle2 = rt.spawn(async { ONCE.get_or_init(func2).await }); @@ -161,7 +160,28 @@ fn drop_cell() { { let once_cell = OnceCell::new(); - let _ = once_cell.set(fooer); + let prev = once_cell.set(fooer); + assert!(prev == ()) + } + assert!(NUM_DROPS.load(Ordering::Acquire) == 1); +} + +#[test] +fn drop_cell_new_with() { + static NUM_DROPS: AtomicU32 = AtomicU32::new(0); + + struct Foo {} + + let fooer = Foo {}; + + impl Drop for Foo { + fn drop(&mut self) { + NUM_DROPS.fetch_add(1, Ordering::Release); + } + } + + { + let once_cell = OnceCell::new_with(Some(fooer)); } assert!(NUM_DROPS.load(Ordering::Acquire) == 1); } @@ -189,3 +209,26 @@ fn drop_into_inner() { let count = NUM_DROPS.load(Ordering::Acquire); assert!(count == 1); } + +#[test] +fn drop_into_inner_new_with() { + static NUM_DROPS: AtomicU32 = AtomicU32::new(0); + + struct Foo {} + + let fooer = Foo {}; + + impl Drop for Foo { + fn drop(&mut self) { + NUM_DROPS.fetch_add(1, Ordering::Release); + } + } + + let once_cell = OnceCell::new_with(Some(fooer)); + let fooer = once_cell.into_inner(); + let count = NUM_DROPS.load(Ordering::Acquire); + assert!(count == 0); + mem::drop(fooer); + let count = NUM_DROPS.load(Ordering::Acquire); + assert!(count == 1); +} \ No newline at end of file From a92ce09cb7bad57e60bdaebd67c2c7b9815b0f10 Mon Sep 17 00:00:00 2001 From: b-naber Date: Wed, 31 Mar 2021 23:58:34 +0200 Subject: [PATCH 13/15] add get_or_try_init --- tokio/src/sync/once_cell.rs | 65 +++++++++++++++++++++++++++++++++-- tokio/tests/sync_once_cell.rs | 36 ++++++++++++++++++- 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index f4234d8a0af..1dbaa957907 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -28,7 +28,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; /// /// #[tokio::main] /// async fn main() { -/// let result1 = ONCE.get_or_init(some_computation).await +/// let result1 = ONCE.get_or_init(some_computation).await; /// assert_eq!(*result1, 2); /// } /// ``` @@ -222,7 +222,7 @@ impl OnceCell { unsafe { self.get_unchecked() } } else { // After acquire().await we have either acquired a permit while self.value - // is still uninitialized, or current thread is awoken after another thread + // is still uninitialized, or the current thread is awoken after another thread // has intialized the value and closed the semaphore, in which case self.initialized // is true and we don't set the value here match self.semaphore.acquire().await { @@ -258,6 +258,67 @@ impl OnceCell { } } + /// Tries to initialize the value of the OnceCell using the async function `f`. + /// If the value of the OnceCell was already initialized prior to this call, + /// a reference to that initialized value is returned. If some other thread + /// initiated the initialization prior to this call and the initialization + /// hasn't completed, this call waits until the initialization is finished. + /// If the function argument `f` returns an error, `get_or_try_init` + /// returns that error, otherwise the result of `f` will be stored in the cell. + /// + /// This will deadlock if `f` tries to initialize the cell itself. + pub async fn get_or_try_init(&self, f: F) -> Result<&T, E> + where + F: FnOnce() -> Fut, + Fut: Future>, + { + if self.initialized() { + // SAFETY: once the value is initialized, no mutable references are given out, so + // we can give out arbitrarily many immutable references + unsafe { Ok(self.get_unchecked()) } + } else { + // After acquire().await we have either acquired a permit while self.value + // is still uninitialized, or the current thread is awoken after another thread + // has intialized the value and closed the semaphore, in which case self.initialized + // is true and we don't set the value here + match self.semaphore.acquire().await { + Ok(_permit) => { + if !self.initialized() { + // If `f()` panics or `select!` is called, this `get_or_try_init` call + // is aborted and the semaphore permit is dropped. + let value = f().await; + + match value { + Ok(value) => { + // SAFETY: There is only one permit on the semaphore, hence only one + // mutable reference is created + unsafe { self.set_value(value) }; + + // SAFETY: once the value is initialized, no mutable references are given out, so + // we can give out arbitrarily many immutable references + unsafe { Ok(self.get_unchecked()) } + }, + Err(e) => Err(e), + } + } else { + unreachable!("acquired semaphore after value was already initialized."); + } + } + Err(_) => { + if self.initialized() { + // SAFETY: once the value is initialized, no mutable references are given out, so + // we can give out arbitrarily many immutable references + unsafe { Ok(self.get_unchecked()) } + } else { + unreachable!( + "Semaphore closed, but the OnceCell has not been initialized." + ); + } + } + } + } + } + /// Moves the value out of the cell, destroying the cell in the process. /// /// Returns `None` if the cell is uninitialized. diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index 0fae73bc66d..b943afc873c 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -18,6 +18,14 @@ async fn func2() -> u32 { 10 } +async fn func_err() -> Result { + Err(()) +} + +async fn func_ok() -> Result { + Ok(10) +} + async fn func_panic() -> u32 { time::sleep(Duration::from_millis(1)).await; panic!(); @@ -144,6 +152,31 @@ fn set_while_initializing() { }); } +#[test] +fn get_or_try_init() { + let rt = runtime::Builder::new_current_thread() + .enable_time() + .start_paused(true) + .build() + .unwrap(); + + static ONCE: OnceCell = OnceCell::const_new(); + + rt.block_on(async { + let handle1 = rt.spawn(async { ONCE.get_or_try_init(func_err).await }); + let handle2 = rt.spawn(async { ONCE.get_or_try_init(func_ok).await }); + + time::advance(Duration::from_millis(1)).await; + time::resume(); + + let result1 = handle1.await.unwrap(); + assert!(result1.is_err()); + + let result2 = handle2.await.unwrap(); + assert_eq!(*result2.unwrap(), 10); + }); +} + #[test] fn drop_cell() { static NUM_DROPS: AtomicU32 = AtomicU32::new(0); @@ -161,7 +194,7 @@ fn drop_cell() { { let once_cell = OnceCell::new(); let prev = once_cell.set(fooer); - assert!(prev == ()) + assert!(prev.is_ok()) } assert!(NUM_DROPS.load(Ordering::Acquire) == 1); } @@ -182,6 +215,7 @@ fn drop_cell_new_with() { { let once_cell = OnceCell::new_with(Some(fooer)); + assert!(once_cell.initialized()); } assert!(NUM_DROPS.load(Ordering::Acquire) == 1); } From b58dda5e1ab1959d74e1aed2c2294778b4b0997d Mon Sep 17 00:00:00 2001 From: b-naber Date: Thu, 1 Apr 2021 23:23:36 +0200 Subject: [PATCH 14/15] suggestions --- tokio/src/sync/once_cell.rs | 2 +- tokio/tests/sync_once_cell.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 1dbaa957907..77d2b648d36 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -297,7 +297,7 @@ impl OnceCell { // SAFETY: once the value is initialized, no mutable references are given out, so // we can give out arbitrarily many immutable references unsafe { Ok(self.get_unchecked()) } - }, + } Err(e) => Err(e), } } else { diff --git a/tokio/tests/sync_once_cell.rs b/tokio/tests/sync_once_cell.rs index b943afc873c..60f50d214e1 100644 --- a/tokio/tests/sync_once_cell.rs +++ b/tokio/tests/sync_once_cell.rs @@ -235,11 +235,11 @@ fn drop_into_inner() { } let once_cell = OnceCell::new(); - let _ = once_cell.set(fooer); + assert!(once_cell.set(fooer).is_ok()); let fooer = once_cell.into_inner(); let count = NUM_DROPS.load(Ordering::Acquire); assert!(count == 0); - mem::drop(fooer); + drop(fooer); let count = NUM_DROPS.load(Ordering::Acquire); assert!(count == 1); } @@ -265,4 +265,4 @@ fn drop_into_inner_new_with() { mem::drop(fooer); let count = NUM_DROPS.load(Ordering::Acquire); assert!(count == 1); -} \ No newline at end of file +} From 201cbf14d1a89f8bf532ddc1e3dcba39445fd6fe Mon Sep 17 00:00:00 2001 From: b-naber Date: Mon, 5 Apr 2021 20:57:15 +0200 Subject: [PATCH 15/15] fix docs --- tokio/src/sync/once_cell.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/sync/once_cell.rs b/tokio/src/sync/once_cell.rs index 77d2b648d36..fa9b1f19f54 100644 --- a/tokio/src/sync/once_cell.rs +++ b/tokio/src/sync/once_cell.rs @@ -167,7 +167,7 @@ impl OnceCell { /// If the value of the OnceCell was already set prior to this call /// then [`SetError::AlreadyInitializedError`] is returned. If another thread /// is initializing the cell while this method is called, - /// ['SetError::InitializingError`] is returned. In order to wait + /// [`SetError::InitializingError`] is returned. In order to wait /// for an ongoing initialization to finish, call /// [`OnceCell::get_or_init`] instead. ///