From 01725f99542be00f84199b3016273b56182d8eee Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 12 Aug 2022 18:42:56 +0200 Subject: [PATCH 01/21] rt: add rng_seed option to `runtime::Builder` The `tokio::select!` macro polls branches in a random order. While this is desirable in production, for testing purposes a more deterministic approach can be useul. This change adds an additional parameter to the runtime `Builder` to set the random number generator seed. This value is then used to reset the seed on all threads associated with the runtime being built. This guarantees that calls to the `tokio::select!` macro which are performed in the same order on the same thread will poll branches in the same order. --- tokio/src/runtime/blocking/pool.rs | 9 ++++++ tokio/src/runtime/builder.rs | 38 ++++++++++++++++++++++ tokio/src/util/mod.rs | 2 ++ tokio/src/util/rand.rs | 33 ++++++++++++++----- tokio/tests/macros_select.rs | 51 ++++++++++++++++++++++++++++++ tokio/tests/rt_basic.rs | 16 ++++++++++ tokio/tests/rt_threaded.rs | 21 ++++++++++++ 7 files changed, 162 insertions(+), 8 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 19315388563..45a9e04fd91 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,6 +8,7 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, ToHandle}; +use crate::util::reset_thread_rng; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -48,6 +49,9 @@ struct Inner { // Customizable wait timeout. keep_alive: Duration, + + // Random number seed + rng_seed: Option, } struct Shared { @@ -182,6 +186,7 @@ impl BlockingPool { before_stop: builder.before_stop.clone(), thread_cap, keep_alive, + rng_seed: builder.rng_seed, }), }, shutdown_rx, @@ -336,6 +341,10 @@ impl Inner { f() } + if let Some(rng_seed) = self.rng_seed { + reset_thread_rng(rng_seed); + } + let mut shared = self.shared.lock(); let mut join_on_thread = None; diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 2f678aa72e2..3e9b0b6249a 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,5 +1,6 @@ use crate::runtime::handle::Handle; use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; +use crate::util::reset_thread_rng; use std::fmt; use std::io; @@ -85,6 +86,9 @@ pub struct Builder { /// How many ticks before yielding to the driver for timer and I/O events? pub(super) event_interval: u32, + /// Specify a random number generator seed to provide deterministic results + pub(super) rng_seed: Option, + #[cfg(tokio_unstable)] pub(super) unhandled_panic: UnhandledPanic, } @@ -250,6 +254,8 @@ impl Builder { global_queue_interval, event_interval, + rng_seed: None, + #[cfg(tokio_unstable)] unhandled_panic: UnhandledPanic::Ignore, } @@ -722,6 +728,30 @@ impl Builder { self } + /// Specifies the random number generation seed to use within all threads associated + /// with the runtime being built. + /// + /// This option is intended to make certain parts of the runtime deterministic. + /// Specifically, this will ensure that the order that branches are polled by the + /// [`tokio::select!`] macro is deterministic. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime; + /// # pub fn main() { + /// let rt = runtime::Builder::new_current_thread() + /// .rng_seed(42) + /// .build(); + /// # } + /// ``` + /// + /// [`tokio::select!`]: crate::select + pub fn rng_seed(&mut self, seed: u64) -> &mut Self { + self.rng_seed = Some(seed); + self + } + cfg_unstable! { /// Configure how the runtime responds to an unhandled panic on a /// spawned task. @@ -789,6 +819,10 @@ impl Builder { let (driver, resources) = driver::Driver::new(self.get_cfg())?; + if let Some(seed) = self.rng_seed { + reset_thread_rng(seed); + } + // Blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); let blocking_spawner = blocking_pool.spawner().clone(); @@ -909,6 +943,10 @@ cfg_rt_multi_thread! { let (driver, resources) = driver::Driver::new(self.get_cfg())?; + if let Some(seed) = self.rng_seed { + reset_thread_rng(seed); + } + // Create the blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 618f5543802..640904e287e 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -73,6 +73,8 @@ pub(crate) mod trace; #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] pub use self::rand::thread_rng_n; +pub(crate) use self::rand::reset_thread_rng; + #[cfg(any( feature = "rt", feature = "time", diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 6b19c8be957..6d138772647 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -16,6 +16,15 @@ pub(crate) struct FastRand { impl FastRand { /// Initializes a new, thread-local, fast random number generator. pub(crate) fn new(seed: u64) -> FastRand { + let (one, two) = FastRand::split_seed(seed); + + FastRand { + one: Cell::new(one), + two: Cell::new(two), + } + } + + fn split_seed(seed: u64) -> (u32, u32) { let one = (seed >> 32) as u32; let mut two = seed as u32; @@ -24,10 +33,14 @@ impl FastRand { two = 1; } - FastRand { - one: Cell::new(one), - two: Cell::new(two), - } + (one, two) + } + + pub(crate) fn reset_seed(&self, seed: u64) { + let (one, two) = FastRand::split_seed(seed); + + self.one.replace(one); + self.two.replace(two); } pub(crate) fn fastrand_n(&self, n: u32) -> u32 { @@ -51,14 +64,18 @@ impl FastRand { } } +thread_local! { + static THREAD_RNG: FastRand = FastRand::new(crate::loom::rand::seed()); +} + +pub(crate) fn reset_thread_rng(seed: u64) { + THREAD_RNG.with(|rng| rng.reset_seed(seed)); +} + // Used by the select macro and `StreamMap` #[cfg(any(feature = "macros"))] #[doc(hidden)] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] pub fn thread_rng_n(n: u32) -> u32 { - thread_local! { - static THREAD_RNG: FastRand = FastRand::new(crate::loom::rand::seed()); - } - THREAD_RNG.with(|rng| rng.fastrand_n(n)) } diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index 23162a1a918..d9038146126 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -599,3 +599,54 @@ async fn mut_ref_patterns() { }, }; } + +#[test] +#[cfg(feature = "rt-multi-thread")] +fn deterministic_select_multi_thread() { + let rt = tokio::runtime::Builder::new_multi_thread() + .rng_seed(4_318_314_286_557_880_373) + .build() + .unwrap(); + + rt.block_on(async { + let _ = tokio::spawn(async { + let num = select_0_to_9().await; + assert_eq!(num, 4); + + let num = select_0_to_9().await; + assert_eq!(num, 1); + }) + .await; + }); +} + +#[test] +fn deterministic_select_current_thread() { + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(4_318_314_286_557_880_373) + .build() + .unwrap(); + + rt.block_on(async { + let num = select_0_to_9().await; + assert_eq!(num, 4); + + let num = select_0_to_9().await; + assert_eq!(num, 1); + }); +} + +async fn select_0_to_9() -> u32 { + tokio::select!( + x = async { 0 } => x, + x = async { 1 } => x, + x = async { 2 } => x, + x = async { 3 } => x, + x = async { 4 } => x, + x = async { 5 } => x, + x = async { 6 } => x, + x = async { 7 } => x, + x = async { 8 } => x, + x = async { 9 } => x, + ) +} diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 0cb92487af3..a4c0595ecae 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -291,6 +291,22 @@ fn timeout_panics_when_no_time_handle() { }); } +#[test] +fn rng_seed() { + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(4_318_314_286_557_880_373) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 44); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 15); + }); +} + #[cfg(tokio_unstable)] mod unstable { use tokio::runtime::{Builder, UnhandledPanic}; diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index dd137fa9b18..4f5887b7bbc 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -539,6 +539,27 @@ async fn test_block_in_place4() { tokio::task::block_in_place(|| {}); } +#[test] +fn rng_seed() { + let seed: u64 = 4_318_314_286_557_880_373; + + let rt = tokio::runtime::Builder::new_multi_thread() + .rng_seed(seed) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 44); + + let _ = tokio::spawn(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 44); + }) + .await; + }); +} + fn rt() -> runtime::Runtime { runtime::Runtime::new().unwrap() } From f95a0eece4ec641e8d7726c9f6857e64b180a6bc Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Tue, 16 Aug 2022 19:08:04 +0200 Subject: [PATCH 02/21] builder accepts RngSeed opaque struct NOTE: This change doesn't correct the handling of the RngSeed on the thread the runtime is started from, so it's all likely to change. Just pushing for safety. Instead of exposing the width of the seed we're currently using, expose an opaque struct which can generate a seed from a byte slice. Additionally, each thread is given with a unique seed based on its `id` (`worker_thread_index`). This should enable more deterministic behavior, while ensuring that each thread does not begin with the same seed. --- tokio/src/runtime/blocking/pool.rs | 10 ++++----- tokio/src/runtime/builder.rs | 9 ++++---- tokio/src/runtime/mod.rs | 1 + tokio/src/util/mod.rs | 1 + tokio/src/util/rand.rs | 33 +++++++++++++++++++++++++++--- tokio/tests/macros_select.rs | 19 +++++++++++------ tokio/tests/rt_basic.rs | 10 +++++---- tokio/tests/rt_threaded.rs | 12 +++++++---- 8 files changed, 69 insertions(+), 26 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 45a9e04fd91..b15edd1e016 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,7 +8,7 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, ToHandle}; -use crate::util::reset_thread_rng; +use crate::util::{reset_thread_rng, RngSeed}; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -51,7 +51,7 @@ struct Inner { keep_alive: Duration, // Random number seed - rng_seed: Option, + rng_seed: Option, } struct Shared { @@ -186,7 +186,7 @@ impl BlockingPool { before_stop: builder.before_stop.clone(), thread_cap, keep_alive, - rng_seed: builder.rng_seed, + rng_seed: builder.rng_seed.clone(), }), }, shutdown_rx, @@ -341,8 +341,8 @@ impl Inner { f() } - if let Some(rng_seed) = self.rng_seed { - reset_thread_rng(rng_seed); + if let Some(rng_seed) = &self.rng_seed { + reset_thread_rng(&rng_seed.seed_with_index(worker_thread_id)); } let mut shared = self.shared.lock(); diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 3e9b0b6249a..226385d8fb8 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,6 +1,7 @@ use crate::runtime::handle::Handle; use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; use crate::util::reset_thread_rng; +pub use crate::util::RngSeed; use std::fmt; use std::io; @@ -87,7 +88,7 @@ pub struct Builder { pub(super) event_interval: u32, /// Specify a random number generator seed to provide deterministic results - pub(super) rng_seed: Option, + pub(super) rng_seed: Option, #[cfg(tokio_unstable)] pub(super) unhandled_panic: UnhandledPanic, @@ -747,7 +748,7 @@ impl Builder { /// ``` /// /// [`tokio::select!`]: crate::select - pub fn rng_seed(&mut self, seed: u64) -> &mut Self { + pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { self.rng_seed = Some(seed); self } @@ -819,7 +820,7 @@ impl Builder { let (driver, resources) = driver::Driver::new(self.get_cfg())?; - if let Some(seed) = self.rng_seed { + if let Some(seed) = &self.rng_seed { reset_thread_rng(seed); } @@ -943,7 +944,7 @@ cfg_rt_multi_thread! { let (driver, resources) = driver::Driver::new(self.get_cfg())?; - if let Some(seed) = self.rng_seed { + if let Some(seed) = &self.rng_seed { reset_thread_rng(seed); } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index ff06b70eb2d..26707969add 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -217,6 +217,7 @@ cfg_rt! { mod builder; pub use self::builder::Builder; + pub use self::builder::RngSeed; cfg_unstable! { pub use self::builder::UnhandledPanic; } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 640904e287e..f200aa35789 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -73,6 +73,7 @@ pub(crate) mod trace; #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] pub use self::rand::thread_rng_n; +pub use self::rand::RngSeed; pub(crate) use self::rand::reset_thread_rng; #[cfg(any( diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 6d138772647..77b72d0b5aa 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -1,4 +1,31 @@ -use std::cell::Cell; +use std::{cell::Cell, collections::hash_map::DefaultHasher, hash::Hasher}; + +/// TODO(hds): Add public docs +#[derive(Clone, Debug)] +pub struct RngSeed { + pub(crate) seed: u64, +} + +impl RngSeed { + /// TODO(hds): Add public docs + pub fn from_bytes(bytes: &[u8]) -> Self { + let mut hasher = DefaultHasher::default(); + hasher.write(bytes); + Self { + seed: hasher.finish() + } + } + + pub(crate) fn seed_with_index(&self, idx: usize) -> Self { + let mut hasher = DefaultHasher::default(); + hasher.write(&self.seed.to_le_bytes()); + hasher.write(&idx.to_le_bytes()); + + Self { + seed: hasher.finish() + } + } +} /// Fast random number generate. /// @@ -68,8 +95,8 @@ thread_local! { static THREAD_RNG: FastRand = FastRand::new(crate::loom::rand::seed()); } -pub(crate) fn reset_thread_rng(seed: u64) { - THREAD_RNG.with(|rng| rng.reset_seed(seed)); +pub(crate) fn reset_thread_rng(rng_seed: &RngSeed) { + THREAD_RNG.with(|rng| rng.reset_seed(rng_seed.seed)); } // Used by the select macro and `StreamMap` diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index d9038146126..f4f1a81f338 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -603,18 +603,22 @@ async fn mut_ref_patterns() { #[test] #[cfg(feature = "rt-multi-thread")] fn deterministic_select_multi_thread() { + use tokio::runtime::RngSeed; + let seed = 4_318_314_286_557_880_373_u64.to_le_bytes(); + let rt = tokio::runtime::Builder::new_multi_thread() - .rng_seed(4_318_314_286_557_880_373) + .worker_threads(1) + .rng_seed(RngSeed::from_bytes(&seed)) .build() .unwrap(); rt.block_on(async { let _ = tokio::spawn(async { let num = select_0_to_9().await; - assert_eq!(num, 4); + assert_eq!(num, 6); let num = select_0_to_9().await; - assert_eq!(num, 1); + assert_eq!(num, 3); }) .await; }); @@ -622,17 +626,20 @@ fn deterministic_select_multi_thread() { #[test] fn deterministic_select_current_thread() { + use tokio::runtime::RngSeed; + let seed = 4_318_314_286_557_880_373_u64.to_le_bytes(); + let rt = tokio::runtime::Builder::new_current_thread() - .rng_seed(4_318_314_286_557_880_373) + .rng_seed(RngSeed::from_bytes(&seed)) .build() .unwrap(); rt.block_on(async { let num = select_0_to_9().await; - assert_eq!(num, 4); + assert_eq!(num, 8); let num = select_0_to_9().await; - assert_eq!(num, 1); + assert_eq!(num, 4); }); } diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index a4c0595ecae..8aa98d1385b 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::runtime::Runtime; +use tokio::runtime::{Runtime, RngSeed}; use tokio::sync::oneshot; use tokio::time::{timeout, Duration}; use tokio_test::{assert_err, assert_ok}; @@ -293,17 +293,19 @@ fn timeout_panics_when_no_time_handle() { #[test] fn rng_seed() { + let seed = 4_318_314_286_557_880_373_u64.to_le_bytes(); + let rt = tokio::runtime::Builder::new_current_thread() - .rng_seed(4_318_314_286_557_880_373) + .rng_seed(RngSeed::from_bytes(&seed)) .build() .unwrap(); rt.block_on(async { let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 44); + assert_eq!(random, 89); let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 15); + assert_eq!(random, 47); }); } diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 4f5887b7bbc..7a0aa4c5b5b 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -541,20 +541,24 @@ async fn test_block_in_place4() { #[test] fn rng_seed() { - let seed: u64 = 4_318_314_286_557_880_373; + use tokio::runtime::RngSeed; + let seed = 4_318_314_286_557_880_373_u64.to_le_bytes(); let rt = tokio::runtime::Builder::new_multi_thread() - .rng_seed(seed) + .worker_threads(1) + .rng_seed(RngSeed::from_bytes(&seed)) .build() .unwrap(); rt.block_on(async { let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 44); + assert_eq!(random, 89); let _ = tokio::spawn(async { + // Because we only have a single worker thread, the + // RNG will be deterministic here as well. let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 44); + assert_eq!(random, 60); }) .await; }); From dc53f3b558d33fdfab070514983548c6d1305577 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 18 Aug 2022 19:03:03 +0200 Subject: [PATCH 03/21] replace thread local RNG on enter guard drop In order to properly clean up after ourselves, we set a specific seed into the thread local RNG when entering into a runtime context. The previous seed (RNG state) is stored in the `EnterGuard` together with the previous context (runtime handle). Upon dropping the guard, the previously stored seed is returned to the thread local RNG. To achieve this in a deterministic, but fair way, we now store a seed generator in the runtime handle, and another in the blocking thread spawner. These seed generators are thread safe (as the one in the handle may be passed across thread boundries) and will produce a deterministic series of seeds when the initial seed provided to the seed generator is the same. --- tokio/src/runtime/blocking/pool.rs | 12 +-- tokio/src/runtime/builder.rs | 24 ++--- tokio/src/runtime/context.rs | 29 ++++-- tokio/src/runtime/handle.rs | 4 + tokio/src/runtime/thread_pool/worker.rs | 5 +- tokio/src/util/mod.rs | 3 +- tokio/src/util/rand.rs | 132 +++++++++++++++++------- tokio/tests/macros_select.rs | 45 ++++---- tokio/tests/rt_basic.rs | 34 +++++- tokio/tests/rt_threaded.rs | 9 +- 10 files changed, 193 insertions(+), 104 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index b15edd1e016..ba087338419 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,7 +8,7 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, ToHandle}; -use crate::util::{reset_thread_rng, RngSeed}; +use crate::util::{replace_thread_rng, RngSeedGenerator}; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -51,7 +51,7 @@ struct Inner { keep_alive: Duration, // Random number seed - rng_seed: Option, + seed_generator: RngSeedGenerator, } struct Shared { @@ -186,7 +186,7 @@ impl BlockingPool { before_stop: builder.before_stop.clone(), thread_cap, keep_alive, - rng_seed: builder.rng_seed.clone(), + seed_generator: builder.seed_generator.next_generator(), }), }, shutdown_rx, @@ -340,10 +340,8 @@ impl Inner { if let Some(f) = &self.after_start { f() } - - if let Some(rng_seed) = &self.rng_seed { - reset_thread_rng(&rng_seed.seed_with_index(worker_thread_id)); - } + // We own this thread so thee is no need to replace the RngSeed once we're done. + let _ = replace_thread_rng(self.seed_generator.next_seed()); let mut shared = self.shared.lock(); let mut join_on_thread = None; diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 226385d8fb8..b48f9ab619c 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,6 +1,7 @@ use crate::runtime::handle::Handle; use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; -use crate::util::reset_thread_rng; +use crate::util::RngSeedGenerator; + pub use crate::util::RngSeed; use std::fmt; @@ -88,7 +89,7 @@ pub struct Builder { pub(super) event_interval: u32, /// Specify a random number generator seed to provide deterministic results - pub(super) rng_seed: Option, + pub(super) seed_generator: RngSeedGenerator, #[cfg(tokio_unstable)] pub(super) unhandled_panic: UnhandledPanic, @@ -255,7 +256,7 @@ impl Builder { global_queue_interval, event_interval, - rng_seed: None, + seed_generator: RngSeedGenerator::new(RngSeed::new()), #[cfg(tokio_unstable)] unhandled_panic: UnhandledPanic::Ignore, @@ -739,17 +740,18 @@ impl Builder { /// # Examples /// /// ``` - /// # use tokio::runtime; + /// # use tokio::runtime::{self, RngSeed}; /// # pub fn main() { + /// let seed = RngSeed::from_bytes(b"place your seed here"); /// let rt = runtime::Builder::new_current_thread() - /// .rng_seed(42) + /// .rng_seed(seed) /// .build(); /// # } /// ``` /// /// [`tokio::select!`]: crate::select pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { - self.rng_seed = Some(seed); + self.seed_generator = RngSeedGenerator::new(seed); self } @@ -820,10 +822,6 @@ impl Builder { let (driver, resources) = driver::Driver::new(self.get_cfg())?; - if let Some(seed) = &self.rng_seed { - reset_thread_rng(seed); - } - // Blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads); let blocking_spawner = blocking_pool.spawner().clone(); @@ -834,6 +832,7 @@ impl Builder { signal_handle: resources.signal_handle, clock: resources.clock, blocking_spawner, + seed_generator: self.seed_generator.next_generator(), }; // And now put a single-threaded scheduler on top of the timer. When @@ -944,10 +943,6 @@ cfg_rt_multi_thread! { let (driver, resources) = driver::Driver::new(self.get_cfg())?; - if let Some(seed) = &self.rng_seed { - reset_thread_rng(seed); - } - // Create the blocking pool let blocking_pool = blocking::create_blocking_pool(self, self.max_blocking_threads + core_threads); @@ -959,6 +954,7 @@ cfg_rt_multi_thread! { signal_handle: resources.signal_handle, clock: resources.clock, blocking_spawner, + seed_generator: self.seed_generator.next_generator(), }; let (scheduler, launch) = ThreadPool::new( diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 4215124fc83..7ff5c6ccf18 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -1,5 +1,8 @@ //! Thread local runtime context -use crate::runtime::{Handle, TryCurrentError}; +use crate::{ + runtime::{Handle, TryCurrentError}, + util::{replace_thread_rng, RngSeed}, +}; use std::cell::RefCell; @@ -93,21 +96,29 @@ pub(crate) fn enter(new: Handle) -> EnterGuard { /// /// [`Handle`]: Handle pub(crate) fn try_enter(new: Handle) -> Option { - CONTEXT - .try_with(|ctx| { - let old = ctx.borrow_mut().replace(new); - EnterGuard(old) - }) - .ok() + let rng_seed = new.as_inner().seed_generator.next_seed(); + let old_handle = CONTEXT.try_with(|ctx| ctx.borrow_mut().replace(new)).ok()?; + + let old_seed = replace_thread_rng(rng_seed); + + Some(EnterGuard { + old_handle, + old_seed, + }) } #[derive(Debug)] -pub(crate) struct EnterGuard(Option); +pub(crate) struct EnterGuard { + old_handle: Option, + old_seed: RngSeed, +} impl Drop for EnterGuard { fn drop(&mut self) { CONTEXT.with(|ctx| { - *ctx.borrow_mut() = self.0.take(); + *ctx.borrow_mut() = self.old_handle.take(); }); + // We discard the RngSeed associated with this guard + let _ = replace_thread_rng(self.old_seed.clone()); } } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 075792a3077..e5a9c6def94 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -2,6 +2,7 @@ use crate::runtime::blocking::{BlockingTask, NoopSchedule}; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{blocking, context, driver, Spawner}; use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; +use crate::util::RngSeedGenerator; use std::future::Future; use std::marker::PhantomData; @@ -53,6 +54,9 @@ pub(crate) struct HandleInner { /// Blocking pool spawner pub(super) blocking_spawner: blocking::Spawner, + + /// Current random number generator seed (when no) + pub(super) seed_generator: RngSeedGenerator, } /// Create a new runtime handle. diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index b01c5bc4749..7856d803d29 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -58,7 +58,6 @@ use crate::coop; use crate::future::Future; -use crate::loom::rand::seed; use crate::loom::sync::{Arc, Mutex}; use crate::park::{Park, Unpark}; use crate::runtime; @@ -67,7 +66,7 @@ use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; use crate::runtime::thread_pool::{queue, Idle, Parker, Unparker}; use crate::runtime::{task, Callback, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::util::atomic_cell::AtomicCell; -use crate::util::FastRand; +use crate::util::{FastRand, RngSeed}; use std::cell::RefCell; use std::time::Duration; @@ -226,7 +225,7 @@ pub(super) fn create( is_shutdown: false, park: Some(park), metrics: MetricsBatch::new(), - rand: FastRand::new(seed()), + rand: FastRand::new(RngSeed::new()), global_queue_interval, event_interval, })); diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index f200aa35789..3bc507150fd 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -73,8 +73,9 @@ pub(crate) mod trace; #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] pub use self::rand::thread_rng_n; +pub(crate) use self::rand::replace_thread_rng; pub use self::rand::RngSeed; -pub(crate) use self::rand::reset_thread_rng; +pub(crate) use self::rand::RngSeedGenerator; #[cfg(any( feature = "rt", diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 77b72d0b5aa..c64920400dc 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -1,29 +1,91 @@ -use std::{cell::Cell, collections::hash_map::DefaultHasher, hash::Hasher}; +use std::{cell::Cell, collections::hash_map::DefaultHasher, hash::Hasher, sync::Mutex}; -/// TODO(hds): Add public docs +/// A deterministic generator for seeds (and other generators). +/// +/// Given the same initial seed, the generator will output the same sequence of seeds. +/// +/// Since the seed generator will be kept in a runtime handle, we need to wrap `FastRand` +/// in a Mutex to make it thread safe. Different to the `FastRand` that we keep in a +/// thread local store, the expectation is that seed generation will not need to happen +/// very frequently, so the cost of the mutex should be minimal. +#[derive(Debug)] +pub(crate) struct RngSeedGenerator { + /// Internal state for the seed generator. We keep it in a Mutex so that we can safely + /// use it across multiple threads. + state: Mutex, +} + +impl RngSeedGenerator { + /// Returns a new generator from the provided seed. + pub(crate) fn new(seed: RngSeed) -> Self { + Self { + state: Mutex::new(FastRand::new(seed)), + } + } + + /// Returns the next seed in the sequence. + pub(crate) fn next_seed(&self) -> RngSeed { + let rng = self + .state + .lock() + .expect("RNG seed generator is internally corrupt"); + + let s = rng.fastrand(); + let r = rng.fastrand(); + + RngSeed::from_pair(s, r) + } + + /// Directly creates a generator using the next seed. + pub(crate) fn next_generator(&self) -> Self { + RngSeedGenerator::new(self.next_seed()) + } +} + +/// A seed for random numnber generation. +/// +/// In order to make certain functions within a runtime deterministic, a seed +/// can be specified at the time of creation. #[derive(Clone, Debug)] pub struct RngSeed { - pub(crate) seed: u64, + s: u32, + r: u32, } impl RngSeed { - /// TODO(hds): Add public docs + /// Creates a random seed using loom internally. + pub(crate) fn new() -> Self { + Self::from_u64(crate::loom::rand::seed()) + } + + /// Generates a seed from the provided byte slice. + /// + /// # Example + /// + /// ``` + /// # use tokio::runtime::RngSeed; + /// let seed = RngSeed::from_bytes(b"make me a seed"); + /// ``` pub fn from_bytes(bytes: &[u8]) -> Self { let mut hasher = DefaultHasher::default(); hasher.write(bytes); - Self { - seed: hasher.finish() - } + Self::from_u64(hasher.finish()) } - pub(crate) fn seed_with_index(&self, idx: usize) -> Self { - let mut hasher = DefaultHasher::default(); - hasher.write(&self.seed.to_le_bytes()); - hasher.write(&idx.to_le_bytes()); + fn from_u64(seed: u64) -> Self { + let one = (seed >> 32) as u32; + let mut two = seed as u32; - Self { - seed: hasher.finish() + if two == 0 { + // This value cannot be zero + two = 1; } + + Self::from_pair(one, two) + } + + fn from_pair(s: u32, r: u32) -> Self { + Self { s, r } } } @@ -42,32 +104,25 @@ pub(crate) struct FastRand { impl FastRand { /// Initializes a new, thread-local, fast random number generator. - pub(crate) fn new(seed: u64) -> FastRand { - let (one, two) = FastRand::split_seed(seed); - + pub(crate) fn new(seed: RngSeed) -> FastRand { FastRand { - one: Cell::new(one), - two: Cell::new(two), + one: Cell::new(seed.s), + two: Cell::new(seed.r), } } - fn split_seed(seed: u64) -> (u32, u32) { - let one = (seed >> 32) as u32; - let mut two = seed as u32; - - if two == 0 { - // This value cannot be zero - two = 1; - } + /// Replaces the state of the random number generator with the provided seed, returning + /// the seed that represents the previous state of the random number generator. + /// + /// The random number generator will become equivalent to one created with + /// the same seed. + pub(crate) fn replace_seed(&self, seed: RngSeed) -> RngSeed { + let old_seed = RngSeed::from_pair(self.one.get(), self.two.get()); - (one, two) - } + self.one.replace(seed.s); + self.two.replace(seed.r); - pub(crate) fn reset_seed(&self, seed: u64) { - let (one, two) = FastRand::split_seed(seed); - - self.one.replace(one); - self.two.replace(two); + old_seed } pub(crate) fn fastrand_n(&self, n: u32) -> u32 { @@ -92,11 +147,16 @@ impl FastRand { } thread_local! { - static THREAD_RNG: FastRand = FastRand::new(crate::loom::rand::seed()); + static THREAD_RNG: FastRand = FastRand::new(RngSeed::new()); } -pub(crate) fn reset_thread_rng(rng_seed: &RngSeed) { - THREAD_RNG.with(|rng| rng.reset_seed(rng_seed.seed)); +/// Seeds the thread local random number generator with the provided seed and +/// return the previously stored seed. +/// +/// The returned seed can be later used to return the thread local random number +/// generator to its previous state. +pub(crate) fn replace_thread_rng(rng_seed: RngSeed) -> RngSeed { + THREAD_RNG.with(|rng| rng.replace_seed(rng_seed)) } // Used by the select macro and `StreamMap` diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index f4f1a81f338..63e3a6ce10e 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -7,6 +7,7 @@ use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; #[cfg(not(tokio_wasm_not_wasi))] use tokio::test as maybe_tokio_test; +use tokio::runtime::RngSeed; use tokio::sync::oneshot; use tokio_test::{assert_ok, assert_pending, assert_ready}; @@ -600,15 +601,30 @@ async fn mut_ref_patterns() { }; } +#[test] +fn deterministic_select_current_thread() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let num = select_0_to_9().await; + assert_eq!(num, 5); + + let num = select_0_to_9().await; + assert_eq!(num, 8); + }); +} + #[test] #[cfg(feature = "rt-multi-thread")] fn deterministic_select_multi_thread() { - use tokio::runtime::RngSeed; - let seed = 4_318_314_286_557_880_373_u64.to_le_bytes(); - + let seed = b"bytes used to generate seed"; let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(1) - .rng_seed(RngSeed::from_bytes(&seed)) + .rng_seed(RngSeed::from_bytes(seed)) .build() .unwrap(); @@ -618,31 +634,12 @@ fn deterministic_select_multi_thread() { assert_eq!(num, 6); let num = select_0_to_9().await; - assert_eq!(num, 3); + assert_eq!(num, 9); }) .await; }); } -#[test] -fn deterministic_select_current_thread() { - use tokio::runtime::RngSeed; - let seed = 4_318_314_286_557_880_373_u64.to_le_bytes(); - - let rt = tokio::runtime::Builder::new_current_thread() - .rng_seed(RngSeed::from_bytes(&seed)) - .build() - .unwrap(); - - rt.block_on(async { - let num = select_0_to_9().await; - assert_eq!(num, 8); - - let num = select_0_to_9().await; - assert_eq!(num, 4); - }); -} - async fn select_0_to_9() -> u32 { tokio::select!( x = async { 0 } => x, diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 8aa98d1385b..408cf7ea266 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::runtime::{Runtime, RngSeed}; +use tokio::runtime::{RngSeed, Runtime}; use tokio::sync::oneshot; use tokio::time::{timeout, Duration}; use tokio_test::{assert_err, assert_ok}; @@ -293,19 +293,43 @@ fn timeout_panics_when_no_time_handle() { #[test] fn rng_seed() { - let seed = 4_318_314_286_557_880_373_u64.to_le_bytes(); + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 58); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 84); + }); +} + +#[test] +fn rng_seed_multi_enter() { + let seed = b"bytes used to generate seed"; let rt = tokio::runtime::Builder::new_current_thread() - .rng_seed(RngSeed::from_bytes(&seed)) + .rng_seed(RngSeed::from_bytes(seed)) .build() .unwrap(); rt.block_on(async { let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 89); + assert_eq!(random, 58); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 84); + }); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 11); let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 47); + assert_eq!(random, 11); }); } diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 7a0aa4c5b5b..45954dcb075 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -542,23 +542,22 @@ async fn test_block_in_place4() { #[test] fn rng_seed() { use tokio::runtime::RngSeed; - let seed = 4_318_314_286_557_880_373_u64.to_le_bytes(); - + let seed = b"bytes used to generate seed"; let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads(1) - .rng_seed(RngSeed::from_bytes(&seed)) + .rng_seed(RngSeed::from_bytes(seed)) .build() .unwrap(); rt.block_on(async { let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 89); + assert_eq!(random, 11); let _ = tokio::spawn(async { // Because we only have a single worker thread, the // RNG will be deterministic here as well. let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 60); + assert_eq!(random, 64); }) .await; }); From 471f870fc000a8b584558c097ecdf992c3e5d058 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 24 Aug 2022 18:03:47 +0200 Subject: [PATCH 04/21] extended to seed worker RNG Extended the implementation to also seed the random number generator used by workers to pick the initial peer to attempt to steal work from. This change was included in the builder function docs. --- tokio/src/runtime/builder.rs | 9 +++++++-- tokio/src/runtime/thread_pool/worker.rs | 4 ++-- tokio/src/util/mod.rs | 21 ++++++++------------- tokio/tests/rt_threaded.rs | 2 +- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index b48f9ab619c..cce09ea7c03 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -734,8 +734,13 @@ impl Builder { /// with the runtime being built. /// /// This option is intended to make certain parts of the runtime deterministic. - /// Specifically, this will ensure that the order that branches are polled by the - /// [`tokio::select!`] macro is deterministic. + /// Specifically, it affects the [`tokio::select!`] macro and the work stealing + /// algorithm. In the case of [`tokio::select!`] it will ensure that the order that + /// branches are polled is deterministic. + /// + /// In the case of work stealing, it's a little more complicated. Each worker will + /// be given a deterministic seed so that the starting peer for each work stealing + /// search will be deterministic. /// /// # Examples /// diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index 7856d803d29..6b8b9f8522a 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -66,7 +66,7 @@ use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; use crate::runtime::thread_pool::{queue, Idle, Parker, Unparker}; use crate::runtime::{task, Callback, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::util::atomic_cell::AtomicCell; -use crate::util::{FastRand, RngSeed}; +use crate::util::FastRand; use std::cell::RefCell; use std::time::Duration; @@ -225,7 +225,7 @@ pub(super) fn create( is_shutdown: false, park: Some(park), metrics: MetricsBatch::new(), - rand: FastRand::new(RngSeed::new()), + rand: FastRand::new(handle_inner.seed_generator.next_seed()), global_queue_interval, event_interval, })); diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 3bc507150fd..379251e9bac 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -40,9 +40,6 @@ pub(crate) use wake_list::WakeList; ))] pub(crate) mod linked_list; -#[cfg(any(feature = "rt-multi-thread", feature = "macros"))] -mod rand; - cfg_rt! { cfg_unstable! { mod idle_notified_set; @@ -60,23 +57,21 @@ cfg_rt! { pub(crate) use vec_deque_cell::VecDequeCell; } -cfg_rt_multi_thread! { - pub(crate) use self::rand::FastRand; +mod rand; +pub use self::rand::RngSeed; +pub(crate) use self::rand::{replace_thread_rng, FastRand, RngSeedGenerator}; + +#[cfg(any(feature = "macros"))] +#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] +pub use self::rand::thread_rng_n; +cfg_rt_multi_thread! { mod try_lock; pub(crate) use try_lock::TryLock; } pub(crate) mod trace; -#[cfg(any(feature = "macros"))] -#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] -pub use self::rand::thread_rng_n; - -pub(crate) use self::rand::replace_thread_rng; -pub use self::rand::RngSeed; -pub(crate) use self::rand::RngSeedGenerator; - #[cfg(any( feature = "rt", feature = "time", diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 45954dcb075..5f1e77ad411 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -551,7 +551,7 @@ fn rng_seed() { rt.block_on(async { let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 11); + assert_eq!(random, 25); let _ = tokio::spawn(async { // Because we only have a single worker thread, the From ea633c6bd4030ff52f56e34909cb3d5937554deb Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 24 Aug 2022 18:42:48 +0200 Subject: [PATCH 05/21] fix dead code warnings --- tokio/src/util/mod.rs | 4 +++- tokio/src/util/rand.rs | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 0a33a025f63..cf0e59be582 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -57,13 +57,15 @@ cfg_rt! { mod rand; pub use self::rand::RngSeed; -pub(crate) use self::rand::{replace_thread_rng, FastRand, RngSeedGenerator}; +pub(crate) use self::rand::{replace_thread_rng, RngSeedGenerator}; #[cfg(any(feature = "macros"))] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] pub use self::rand::thread_rng_n; cfg_rt_multi_thread! { + pub(crate) use self::rand::FastRand; + mod try_lock; pub(crate) use try_lock::TryLock; } diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index c64920400dc..907e09e3c2f 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -125,6 +125,7 @@ impl FastRand { old_seed } + #[cfg(any(feature = "macros", feature = "rt-multi-thread"))] pub(crate) fn fastrand_n(&self, n: u32) -> u32 { // This is similar to fastrand() % n, but faster. // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ From f78c7ccd5fb6f468f708d19669a9344f18cd5cd0 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 24 Aug 2022 18:46:53 +0200 Subject: [PATCH 06/21] fix code fmt --- tokio/src/runtime/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 02ca6cf6322..c79d7ee818d 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -737,7 +737,7 @@ impl Builder { /// Specifically, it affects the [`tokio::select!`] macro and the work stealing /// algorithm. In the case of [`tokio::select!`] it will ensure that the order that /// branches are polled is deterministic. - /// + /// /// In the case of work stealing, it's a little more complicated. Each worker will /// be given a deterministic seed so that the starting peer for each work stealing /// search will be deterministic. From dbdba4fa3febc46c0f459f9f284bf28634f637b2 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Wed, 24 Aug 2022 19:02:56 +0200 Subject: [PATCH 07/21] fix visibility of rand module --- tokio/src/util/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index cf0e59be582..0fe74837d6e 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -40,7 +40,13 @@ pub(crate) use wake_list::WakeList; ))] pub(crate) mod linked_list; +#[cfg(any(feature = "rt", feature = "macros", feature = "stream"))] +mod rand; + cfg_rt! { + pub use self::rand::RngSeed; + pub(crate) use self::rand::{replace_thread_rng, RngSeedGenerator}; + mod idle_notified_set; pub(crate) use idle_notified_set::IdleNotifiedSet; @@ -55,9 +61,6 @@ cfg_rt! { pub(crate) use vec_deque_cell::VecDequeCell; } -mod rand; -pub use self::rand::RngSeed; -pub(crate) use self::rand::{replace_thread_rng, RngSeedGenerator}; #[cfg(any(feature = "macros"))] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] From 3cd5146b3dae1016ee64c7c379cfe0f5f0b73c49 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 25 Aug 2022 15:23:39 +0200 Subject: [PATCH 08/21] fix visibility with different feature settings --- tokio/src/util/mod.rs | 1 - tokio/src/util/rand.rs | 79 +++++++++++++++++++++++------------------- 2 files changed, 44 insertions(+), 36 deletions(-) diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 0fe74837d6e..d31a216ad8d 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -61,7 +61,6 @@ cfg_rt! { pub(crate) use vec_deque_cell::VecDequeCell; } - #[cfg(any(feature = "macros"))] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] pub use self::rand::thread_rng_n; diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 907e09e3c2f..e4bb6c7b62d 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -1,44 +1,48 @@ -use std::{cell::Cell, collections::hash_map::DefaultHasher, hash::Hasher, sync::Mutex}; +use std::cell::Cell; -/// A deterministic generator for seeds (and other generators). -/// -/// Given the same initial seed, the generator will output the same sequence of seeds. -/// -/// Since the seed generator will be kept in a runtime handle, we need to wrap `FastRand` -/// in a Mutex to make it thread safe. Different to the `FastRand` that we keep in a -/// thread local store, the expectation is that seed generation will not need to happen -/// very frequently, so the cost of the mutex should be minimal. -#[derive(Debug)] -pub(crate) struct RngSeedGenerator { - /// Internal state for the seed generator. We keep it in a Mutex so that we can safely - /// use it across multiple threads. - state: Mutex, -} +cfg_rt! { + use std::sync::Mutex; -impl RngSeedGenerator { - /// Returns a new generator from the provided seed. - pub(crate) fn new(seed: RngSeed) -> Self { - Self { - state: Mutex::new(FastRand::new(seed)), - } + /// A deterministic generator for seeds (and other generators). + /// + /// Given the same initial seed, the generator will output the same sequence of seeds. + /// + /// Since the seed generator will be kept in a runtime handle, we need to wrap `FastRand` + /// in a Mutex to make it thread safe. Different to the `FastRand` that we keep in a + /// thread local store, the expectation is that seed generation will not need to happen + /// very frequently, so the cost of the mutex should be minimal. + #[derive(Debug)] + pub(crate) struct RngSeedGenerator { + /// Internal state for the seed generator. We keep it in a Mutex so that we can safely + /// use it across multiple threads. + state: Mutex, } - /// Returns the next seed in the sequence. - pub(crate) fn next_seed(&self) -> RngSeed { - let rng = self - .state - .lock() - .expect("RNG seed generator is internally corrupt"); + impl RngSeedGenerator { + /// Returns a new generator from the provided seed. + pub(crate) fn new(seed: RngSeed) -> Self { + Self { + state: Mutex::new(FastRand::new(seed)), + } + } - let s = rng.fastrand(); - let r = rng.fastrand(); + /// Returns the next seed in the sequence. + pub(crate) fn next_seed(&self) -> RngSeed { + let rng = self + .state + .lock() + .expect("RNG seed generator is internally corrupt"); - RngSeed::from_pair(s, r) - } + let s = rng.fastrand(); + let r = rng.fastrand(); + + RngSeed::from_pair(s, r) + } - /// Directly creates a generator using the next seed. - pub(crate) fn next_generator(&self) -> Self { - RngSeedGenerator::new(self.next_seed()) + /// Directly creates a generator using the next seed. + pub(crate) fn next_generator(&self) -> Self { + RngSeedGenerator::new(self.next_seed()) + } } } @@ -46,6 +50,7 @@ impl RngSeedGenerator { /// /// In order to make certain functions within a runtime deterministic, a seed /// can be specified at the time of creation. +#[allow(unreachable_pub)] #[derive(Clone, Debug)] pub struct RngSeed { s: u32, @@ -66,7 +71,10 @@ impl RngSeed { /// # use tokio::runtime::RngSeed; /// let seed = RngSeed::from_bytes(b"make me a seed"); /// ``` + #[cfg(feature = "rt")] pub fn from_bytes(bytes: &[u8]) -> Self { + use std::{collections::hash_map::DefaultHasher, hash::Hasher}; + let mut hasher = DefaultHasher::default(); hasher.write(bytes); Self::from_u64(hasher.finish()) @@ -88,7 +96,6 @@ impl RngSeed { Self { s, r } } } - /// Fast random number generate. /// /// Implement xorshift64+: 2 32-bit xorshift sequences added together. @@ -116,6 +123,7 @@ impl FastRand { /// /// The random number generator will become equivalent to one created with /// the same seed. + #[cfg(feature = "rt")] pub(crate) fn replace_seed(&self, seed: RngSeed) -> RngSeed { let old_seed = RngSeed::from_pair(self.one.get(), self.two.get()); @@ -156,6 +164,7 @@ thread_local! { /// /// The returned seed can be later used to return the thread local random number /// generator to its previous state. +#[cfg(feature = "rt")] pub(crate) fn replace_thread_rng(rng_seed: RngSeed) -> RngSeed { THREAD_RNG.with(|rng| rng.replace_seed(rng_seed)) } From 379dadb1b2eb9566b9af12857c81d17f3aee201f Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 2 Sep 2022 11:49:27 +0200 Subject: [PATCH 09/21] exclude wasi from multi-thread select test --- tokio/tests/macros_select.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index 63e3a6ce10e..bfa35b9ff02 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -619,7 +619,7 @@ fn deterministic_select_current_thread() { } #[test] -#[cfg(feature = "rt-multi-thread")] +#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] fn deterministic_select_multi_thread() { let seed = b"bytes used to generate seed"; let rt = tokio::runtime::Builder::new_multi_thread() From 1ee9c8000052a677d164487f8db3ec8855e5a0c2 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 8 Sep 2022 00:02:25 +0200 Subject: [PATCH 10/21] make `Builder::rng_seed` and `RngSeed` unstable In order to test out the new API before fixing on it, make it unstable first. --- tokio/src/runtime/builder.rs | 61 +++++++++++----------- tokio/src/runtime/mod.rs | 2 +- tokio/src/util/mod.rs | 1 + tokio/src/util/rand.rs | 32 ++++++------ tokio/tests/macros_select.rs | 98 +++++++++++++++++++----------------- tokio/tests/rt_basic.rs | 88 ++++++++++++++++---------------- tokio/tests/rt_threaded.rs | 48 +++++++++--------- 7 files changed, 169 insertions(+), 161 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index bf6c2e5dc13..25239a12990 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -2,6 +2,7 @@ use crate::runtime::handle::Handle; use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; use crate::util::RngSeedGenerator; +#[allow(unreachable_pub)] pub use crate::util::RngSeed; use std::fmt; @@ -737,36 +738,6 @@ impl Builder { self } - /// Specifies the random number generation seed to use within all threads associated - /// with the runtime being built. - /// - /// This option is intended to make certain parts of the runtime deterministic. - /// Specifically, it affects the [`tokio::select!`] macro and the work stealing - /// algorithm. In the case of [`tokio::select!`] it will ensure that the order that - /// branches are polled is deterministic. - /// - /// In the case of work stealing, it's a little more complicated. Each worker will - /// be given a deterministic seed so that the starting peer for each work stealing - /// search will be deterministic. - /// - /// # Examples - /// - /// ``` - /// # use tokio::runtime::{self, RngSeed}; - /// # pub fn main() { - /// let seed = RngSeed::from_bytes(b"place your seed here"); - /// let rt = runtime::Builder::new_current_thread() - /// .rng_seed(seed) - /// .build(); - /// # } - /// ``` - /// - /// [`tokio::select!`]: crate::select - pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { - self.seed_generator = RngSeedGenerator::new(seed); - self - } - cfg_unstable! { /// Configure how the runtime responds to an unhandled panic on a /// spawned task. @@ -866,6 +837,36 @@ impl Builder { pub fn disable_lifo_slot(&mut self) -> &mut Self { self.disable_lifo_slot = true; self + } + + /// Specifies the random number generation seed to use within all threads associated + /// with the runtime being built. + /// + /// This option is intended to make certain parts of the runtime deterministic. + /// Specifically, it affects the [`tokio::select!`] macro and the work stealing + /// algorithm. In the case of [`tokio::select!`] it will ensure that the order that + /// branches are polled is deterministic. + /// + /// In the case of work stealing, it's a little more complicated. Each worker will + /// be given a deterministic seed so that the starting peer for each work stealing + /// search will be deterministic. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime::{self, RngSeed}; + /// # pub fn main() { + /// let seed = RngSeed::from_bytes(b"place your seed here"); + /// let rt = runtime::Builder::new_current_thread() + /// .rng_seed(seed) + /// .build(); + /// # } + /// ``` + /// + /// [`tokio::select!`]: crate::select + pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { + self.seed_generator = RngSeedGenerator::new(seed); + self } } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 4a317f1e12c..21282063f69 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -211,9 +211,9 @@ cfg_rt! { mod builder; pub use self::builder::Builder; - pub use self::builder::RngSeed; cfg_unstable! { pub use self::builder::UnhandledPanic; + pub use self::builder::RngSeed; } pub(crate) mod context; diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 290a9498d41..49509d25f30 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -44,6 +44,7 @@ pub(crate) mod linked_list; mod rand; cfg_rt! { + #[allow(unreachable_pub)] pub use self::rand::RngSeed; pub(crate) use self::rand::{replace_thread_rng, RngSeedGenerator}; diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index e4bb6c7b62d..09754cea993 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -63,21 +63,23 @@ impl RngSeed { Self::from_u64(crate::loom::rand::seed()) } - /// Generates a seed from the provided byte slice. - /// - /// # Example - /// - /// ``` - /// # use tokio::runtime::RngSeed; - /// let seed = RngSeed::from_bytes(b"make me a seed"); - /// ``` - #[cfg(feature = "rt")] - pub fn from_bytes(bytes: &[u8]) -> Self { - use std::{collections::hash_map::DefaultHasher, hash::Hasher}; - - let mut hasher = DefaultHasher::default(); - hasher.write(bytes); - Self::from_u64(hasher.finish()) + cfg_unstable! { + /// Generates a seed from the provided byte slice. + /// + /// # Example + /// + /// ``` + /// # use tokio::runtime::RngSeed; + /// let seed = RngSeed::from_bytes(b"make me a seed"); + /// ``` + #[cfg(feature = "rt")] + pub fn from_bytes(bytes: &[u8]) -> Self { + use std::{collections::hash_map::DefaultHasher, hash::Hasher}; + + let mut hasher = DefaultHasher::default(); + hasher.write(bytes); + Self::from_u64(hasher.finish()) + } } fn from_u64(seed: u64) -> Self { diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index bfa35b9ff02..d6468cbff1c 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -7,7 +7,6 @@ use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; #[cfg(not(tokio_wasm_not_wasi))] use tokio::test as maybe_tokio_test; -use tokio::runtime::RngSeed; use tokio::sync::oneshot; use tokio_test::{assert_ok, assert_pending, assert_ready}; @@ -601,56 +600,61 @@ async fn mut_ref_patterns() { }; } -#[test] -fn deterministic_select_current_thread() { - let seed = b"bytes used to generate seed"; - let rt = tokio::runtime::Builder::new_current_thread() - .rng_seed(RngSeed::from_bytes(seed)) - .build() - .unwrap(); +#[cfg(tokio_unstable)] +mod unstable { + use tokio::runtime::RngSeed; - rt.block_on(async { - let num = select_0_to_9().await; - assert_eq!(num, 5); + #[test] + fn deterministic_select_current_thread() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); - let num = select_0_to_9().await; - assert_eq!(num, 8); - }); -} - -#[test] -#[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] -fn deterministic_select_multi_thread() { - let seed = b"bytes used to generate seed"; - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(1) - .rng_seed(RngSeed::from_bytes(seed)) - .build() - .unwrap(); - - rt.block_on(async { - let _ = tokio::spawn(async { + rt.block_on(async { let num = select_0_to_9().await; - assert_eq!(num, 6); + assert_eq!(num, 5); let num = select_0_to_9().await; - assert_eq!(num, 9); - }) - .await; - }); -} + assert_eq!(num, 8); + }); + } + + #[test] + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + fn deterministic_select_multi_thread() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let _ = tokio::spawn(async { + let num = select_0_to_9().await; + assert_eq!(num, 6); + + let num = select_0_to_9().await; + assert_eq!(num, 9); + }) + .await; + }); + } -async fn select_0_to_9() -> u32 { - tokio::select!( - x = async { 0 } => x, - x = async { 1 } => x, - x = async { 2 } => x, - x = async { 3 } => x, - x = async { 4 } => x, - x = async { 5 } => x, - x = async { 6 } => x, - x = async { 7 } => x, - x = async { 8 } => x, - x = async { 9 } => x, - ) + async fn select_0_to_9() -> u32 { + tokio::select!( + x = async { 0 } => x, + x = async { 1 } => x, + x = async { 2 } => x, + x = async { 3 } => x, + x = async { 4 } => x, + x = async { 5 } => x, + x = async { 6 } => x, + x = async { 7 } => x, + x = async { 8 } => x, + x = async { 9 } => x, + ) + } } diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 408cf7ea266..4ee426d463c 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -1,7 +1,7 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] -use tokio::runtime::{RngSeed, Runtime}; +use tokio::runtime::Runtime; use tokio::sync::oneshot; use tokio::time::{timeout, Duration}; use tokio_test::{assert_err, assert_ok}; @@ -291,51 +291,9 @@ fn timeout_panics_when_no_time_handle() { }); } -#[test] -fn rng_seed() { - let seed = b"bytes used to generate seed"; - let rt = tokio::runtime::Builder::new_current_thread() - .rng_seed(RngSeed::from_bytes(seed)) - .build() - .unwrap(); - - rt.block_on(async { - let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 58); - - let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 84); - }); -} - -#[test] -fn rng_seed_multi_enter() { - let seed = b"bytes used to generate seed"; - let rt = tokio::runtime::Builder::new_current_thread() - .rng_seed(RngSeed::from_bytes(seed)) - .build() - .unwrap(); - - rt.block_on(async { - let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 58); - - let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 84); - }); - - rt.block_on(async { - let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 11); - - let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 11); - }); -} - #[cfg(tokio_unstable)] mod unstable { - use tokio::runtime::{Builder, UnhandledPanic}; + use tokio::runtime::{Builder, RngSeed, UnhandledPanic}; #[test] #[should_panic( @@ -423,6 +381,48 @@ mod unstable { assert!(th.join().is_err()); } } + + #[test] + fn rng_seed() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 58); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 84); + }); + } + + #[test] + fn rng_seed_multi_enter() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 58); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 84); + }); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 11); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 11); + }); + } } fn rt() -> Runtime { diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 02b1f37d043..3df789770e5 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -539,30 +539,6 @@ async fn test_block_in_place4() { tokio::task::block_in_place(|| {}); } -#[test] -fn rng_seed() { - use tokio::runtime::RngSeed; - let seed = b"bytes used to generate seed"; - let rt = tokio::runtime::Builder::new_multi_thread() - .worker_threads(1) - .rng_seed(RngSeed::from_bytes(seed)) - .build() - .unwrap(); - - rt.block_on(async { - let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 25); - - let _ = tokio::spawn(async { - // Because we only have a single worker thread, the - // RNG will be deterministic here as well. - let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 64); - }) - .await; - }); -} - fn rt() -> runtime::Runtime { runtime::Runtime::new().unwrap() } @@ -570,6 +546,7 @@ fn rt() -> runtime::Runtime { #[cfg(tokio_unstable)] mod unstable { use super::*; + use tokio::runtime::RngSeed; #[test] fn test_disable_lifo_slot() { @@ -589,4 +566,27 @@ mod unstable { .unwrap(); }) } + + #[test] + fn rng_seed() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 25); + + let _ = tokio::spawn(async { + // Because we only have a single worker thread, the + // RNG will be deterministic here as well. + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 64); + }) + .await; + }); + } } From 03bbd6b36125685032a2824ba7b1a80b423712a3 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 8 Sep 2022 17:56:51 +0200 Subject: [PATCH 11/21] make `RngSeed` not public in `builder` mod It shouldn't have been in the first place. --- tokio/src/runtime/builder.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 25239a12990..e45955a65c3 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,9 +1,6 @@ use crate::runtime::handle::Handle; use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; -use crate::util::RngSeedGenerator; - -#[allow(unreachable_pub)] -pub use crate::util::RngSeed; +use crate::util::{RngSeed, RngSeedGenerator}; use std::fmt; use std::io; From 8981f7224ad9e135ab7a810631c5412b7b6e7f0b Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 8 Sep 2022 18:08:54 +0200 Subject: [PATCH 12/21] publish `RngSeed` from `crate::util` --- tokio/src/runtime/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 21282063f69..4fd5606bef8 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -213,7 +213,7 @@ cfg_rt! { pub use self::builder::Builder; cfg_unstable! { pub use self::builder::UnhandledPanic; - pub use self::builder::RngSeed; + pub use crate::util::RngSeed; } pub(crate) mod context; From 3bdbec7312018d4fae1b27c3c3a0e93b2d2b6930 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 15 Sep 2022 17:22:14 +0200 Subject: [PATCH 13/21] fix issues arising from merging with Carl's refactoring --- tokio/src/runtime/builder.rs | 2 ++ tokio/src/runtime/config.rs | 5 +++++ tokio/src/runtime/scheduler/multi_thread/worker.rs | 2 +- tokio/tests/macros_select.rs | 2 +- tokio/tests/rt_basic.rs | 12 ++++++------ tokio/tests/rt_threaded.rs | 2 +- 6 files changed, 16 insertions(+), 9 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 3360a9bb66d..9353d2930ce 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -891,6 +891,7 @@ impl Builder { #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, + seed_generator: self.seed_generator.next_generator(), }, ); let spawner = Spawner::CurrentThread(scheduler.spawner().clone()); @@ -1012,6 +1013,7 @@ cfg_rt_multi_thread! { #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, + seed_generator: self.seed_generator.next_generator(), }, ); let spawner = Spawner::MultiThread(scheduler.spawner().clone()); diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index 59c19988e5e..39eb1cf118b 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -1,5 +1,6 @@ #![cfg_attr(any(not(feature = "full"), tokio_wasm), allow(dead_code))] use crate::runtime::Callback; +use crate::util::RngSeedGenerator; pub(crate) struct Config { /// How many ticks before pulling a task from the global/remote queue? @@ -23,6 +24,10 @@ pub(crate) struct Config { /// stop-gap, this unstable option lets users disable the LIFO task. pub(crate) disable_lifo_slot: bool, + /// Random number generator seed to configure runtimes to act in a + /// deterministic way. + pub(crate) seed_generator: RngSeedGenerator, + #[cfg(tokio_unstable)] /// How to respond to unhandled task panics. pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index f25b33e2d6f..2193486484f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -205,7 +205,7 @@ pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc, is_shutdown: false, park: Some(park), metrics: MetricsBatch::new(), - rand: FastRand::new(handle_inner.seed_generator.next_seed()), + rand: FastRand::new(config.seed_generator.next_seed()), })); remotes.push(Remote { steal, unpark }); diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index d6468cbff1c..e642a5d9ff4 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -617,7 +617,7 @@ mod unstable { assert_eq!(num, 5); let num = select_0_to_9().await; - assert_eq!(num, 8); + assert_eq!(num, 1); }); } diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 4ee426d463c..2f441865941 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -392,10 +392,10 @@ mod unstable { rt.block_on(async { let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 58); + assert_eq!(random, 59); let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 84); + assert_eq!(random, 10); }); } @@ -409,18 +409,18 @@ mod unstable { rt.block_on(async { let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 58); + assert_eq!(random, 59); let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 84); + assert_eq!(random, 10); }); rt.block_on(async { let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 11); + assert_eq!(random, 86); let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 11); + assert_eq!(random, 1); }); } } diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 754dc8d017e..9a8644af6fe 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -574,7 +574,7 @@ mod unstable { rt.block_on(async { let random = tokio::macros::support::thread_rng_n(100); - assert_eq!(random, 25); + assert_eq!(random, 86); let _ = tokio::spawn(async { // Because we only have a single worker thread, the From 18ba0b6175f4f1cb13429edaf62f7dd0cef17b49 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 15 Sep 2022 18:06:08 +0200 Subject: [PATCH 14/21] only allow unreachable_pub when tokio_unstable isn't set Co-authored-by: Carl Lerche --- tokio/src/util/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 4d79b33750a..6e6e71ad6d1 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -61,7 +61,7 @@ cfg_rt! { pub(crate) use rc_cell::RcCell; } -#[allow(unreachable_pub)] +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] pub use self::rand::RngSeed; pub(crate) use self::rand::{replace_thread_rng, RngSeedGenerator}; From 3482562ca5f4c8afce62ffc65b06cc5c405da7b2 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 15 Sep 2022 18:06:23 +0200 Subject: [PATCH 15/21] fix comment formatting inside macro Co-authored-by: Carl Lerche --- tokio/src/runtime/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 9353d2930ce..69a924bfdec 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -836,7 +836,7 @@ impl Builder { self } - /// Specifies the random number generation seed to use within all threads associated + /// Specifies the random number generation seed to use within all threads associated /// with the runtime being built. /// /// This option is intended to make certain parts of the runtime deterministic. From c1111fc9d097ced2d5f5ea534aede1c584b83c5b Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 15 Sep 2022 18:06:44 +0200 Subject: [PATCH 16/21] fix visibility when default features disabled --- tokio/src/util/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 4d79b33750a..c52cf13ec36 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -40,9 +40,6 @@ pub(crate) use wake_list::WakeList; ))] pub(crate) mod linked_list; -#[cfg(any(feature = "rt", feature = "macros", feature = "stream"))] -mod rand; - cfg_rt! { mod idle_notified_set; pub(crate) use idle_notified_set::IdleNotifiedSet; @@ -61,6 +58,8 @@ cfg_rt! { pub(crate) use rc_cell::RcCell; } +mod rand; + #[allow(unreachable_pub)] pub use self::rand::RngSeed; pub(crate) use self::rand::{replace_thread_rng, RngSeedGenerator}; From 04e2fb427a42c29a8d40ec4ef62f85152a736a0a Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 15 Sep 2022 18:15:19 +0200 Subject: [PATCH 17/21] fix visibility issues Now that the runtime module is present even without the rt feature. --- tokio/src/runtime/context.rs | 6 ++---- tokio/src/util/rand.rs | 1 - 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index 9f4231f4ed3..115ea470f99 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -1,8 +1,6 @@ //! Thread local runtime context -use crate::{ - runtime::{Handle, TryCurrentError}, - util::{replace_thread_rng, RngSeed}, -}; +use crate::runtime::{Handle, TryCurrentError}; +use crate::util::{replace_thread_rng, RngSeed}; use std::cell::RefCell; diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 5062c939ec3..69002f6f4ea 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -163,7 +163,6 @@ thread_local! { /// /// The returned seed can be later used to return the thread local random number /// generator to its previous state. -#[cfg(feature = "rt")] pub(crate) fn replace_thread_rng(rng_seed: RngSeed) -> RngSeed { THREAD_RNG.with(|rng| rng.replace_seed(rng_seed)) } From e0c944df899988d3798123e49827ee98d0ecb897 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Thu, 15 Sep 2022 18:21:46 +0200 Subject: [PATCH 18/21] another offering to the gods of visibility --- tokio/src/util/rand.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 69002f6f4ea..3b3e6d5dd68 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -122,7 +122,6 @@ impl FastRand { /// /// The random number generator will become equivalent to one created with /// the same seed. - #[cfg(feature = "rt")] pub(crate) fn replace_seed(&self, seed: RngSeed) -> RngSeed { let old_seed = RngSeed::from_pair(self.one.get(), self.two.get()); From c18463cb4c36bbca4864528b47cf32082a8f4beb Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 16 Sep 2022 11:45:33 +0200 Subject: [PATCH 19/21] next offering to visibility --- tokio/src/runtime/handle.rs | 3 ++ tokio/src/util/mod.rs | 9 +++-- tokio/src/util/rand.rs | 73 ++++++++++++++++++++----------------- 3 files changed, 48 insertions(+), 37 deletions(-) diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index c3f6c7d149c..1888dd8db60 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -2,6 +2,8 @@ #![cfg_attr(not(feature = "rt"), allow(dead_code))] use crate::runtime::driver; + +#[cfg(feature = "rt")] use crate::util::RngSeedGenerator; use std::sync::Arc; @@ -61,6 +63,7 @@ pub(crate) struct HandleInner { pub(crate) blocking_spawner: blocking::Spawner, /// Current random number generator seed (when no) + #[cfg(feature = "rt")] pub(super) seed_generator: RngSeedGenerator, } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 144925f6ee6..cd113580fb9 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -40,10 +40,15 @@ pub(crate) use wake_list::WakeList; ))] pub(crate) mod linked_list; +#[cfg(any(feature = "rt", feature = "macros"))] +mod rand; + cfg_rt! { mod idle_notified_set; pub(crate) use idle_notified_set::IdleNotifiedSet; + pub(crate) use self::rand::{RngSeedGenerator,replace_thread_rng}; + mod wake; pub(crate) use wake::WakerRef; pub(crate) use wake::{waker_ref, Wake}; @@ -58,11 +63,9 @@ cfg_rt! { pub(crate) use rc_cell::RcCell; } -mod rand; - #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[cfg(any(feature = "rt", feature = "macros"))] pub use self::rand::RngSeed; -pub(crate) use self::rand::{replace_thread_rng, RngSeedGenerator}; #[cfg(any(feature = "macros"))] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 3b3e6d5dd68..09754cea993 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -1,45 +1,48 @@ use std::cell::Cell; -use std::sync::Mutex; -/// A deterministic generator for seeds (and other generators). -/// -/// Given the same initial seed, the generator will output the same sequence of seeds. -/// -/// Since the seed generator will be kept in a runtime handle, we need to wrap `FastRand` -/// in a Mutex to make it thread safe. Different to the `FastRand` that we keep in a -/// thread local store, the expectation is that seed generation will not need to happen -/// very frequently, so the cost of the mutex should be minimal. -#[derive(Debug)] -pub(crate) struct RngSeedGenerator { - /// Internal state for the seed generator. We keep it in a Mutex so that we can safely - /// use it across multiple threads. - state: Mutex, -} +cfg_rt! { + use std::sync::Mutex; -impl RngSeedGenerator { - /// Returns a new generator from the provided seed. - pub(crate) fn new(seed: RngSeed) -> Self { - Self { - state: Mutex::new(FastRand::new(seed)), - } + /// A deterministic generator for seeds (and other generators). + /// + /// Given the same initial seed, the generator will output the same sequence of seeds. + /// + /// Since the seed generator will be kept in a runtime handle, we need to wrap `FastRand` + /// in a Mutex to make it thread safe. Different to the `FastRand` that we keep in a + /// thread local store, the expectation is that seed generation will not need to happen + /// very frequently, so the cost of the mutex should be minimal. + #[derive(Debug)] + pub(crate) struct RngSeedGenerator { + /// Internal state for the seed generator. We keep it in a Mutex so that we can safely + /// use it across multiple threads. + state: Mutex, } - /// Returns the next seed in the sequence. - pub(crate) fn next_seed(&self) -> RngSeed { - let rng = self - .state - .lock() - .expect("RNG seed generator is internally corrupt"); + impl RngSeedGenerator { + /// Returns a new generator from the provided seed. + pub(crate) fn new(seed: RngSeed) -> Self { + Self { + state: Mutex::new(FastRand::new(seed)), + } + } - let s = rng.fastrand(); - let r = rng.fastrand(); + /// Returns the next seed in the sequence. + pub(crate) fn next_seed(&self) -> RngSeed { + let rng = self + .state + .lock() + .expect("RNG seed generator is internally corrupt"); - RngSeed::from_pair(s, r) - } + let s = rng.fastrand(); + let r = rng.fastrand(); - /// Directly creates a generator using the next seed. - pub(crate) fn next_generator(&self) -> Self { - RngSeedGenerator::new(self.next_seed()) + RngSeed::from_pair(s, r) + } + + /// Directly creates a generator using the next seed. + pub(crate) fn next_generator(&self) -> Self { + RngSeedGenerator::new(self.next_seed()) + } } } @@ -122,6 +125,7 @@ impl FastRand { /// /// The random number generator will become equivalent to one created with /// the same seed. + #[cfg(feature = "rt")] pub(crate) fn replace_seed(&self, seed: RngSeed) -> RngSeed { let old_seed = RngSeed::from_pair(self.one.get(), self.two.get()); @@ -162,6 +166,7 @@ thread_local! { /// /// The returned seed can be later used to return the thread local random number /// generator to its previous state. +#[cfg(feature = "rt")] pub(crate) fn replace_thread_rng(rng_seed: RngSeed) -> RngSeed { THREAD_RNG.with(|rng| rng.replace_seed(rng_seed)) } From d85e129ad1fb920a74bef4f09a7d6aeb3961b715 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 16 Sep 2022 12:52:25 +0200 Subject: [PATCH 20/21] RngSeed isn't need for the macros feature --- tokio/src/util/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index cd113580fb9..65907231b61 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -64,7 +64,7 @@ cfg_rt! { } #[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] -#[cfg(any(feature = "rt", feature = "macros"))] +#[cfg(feature = "rt")] pub use self::rand::RngSeed; #[cfg(any(feature = "macros"))] From a04044e0aa47de65aba83fd93e43624c69d857b9 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 16 Sep 2022 15:37:14 +0200 Subject: [PATCH 21/21] describe conditions for determinism in rng_seed docs Also fixed a couple of internal comments. --- tokio/src/runtime/blocking/pool.rs | 2 +- tokio/src/runtime/builder.rs | 6 ++++++ tokio/src/runtime/handle.rs | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index 05713babaf2..d892ac111b4 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -436,7 +436,7 @@ impl Inner { if let Some(f) = &self.after_start { f() } - // We own this thread so thee is no need to replace the RngSeed once we're done. + // We own this thread so there is no need to replace the RngSeed once we're done. let _ = replace_thread_rng(self.seed_generator.next_seed()); let mut shared = self.shared.lock(); diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 4e0afeaaa17..4e93251b2f1 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -848,6 +848,12 @@ impl Builder { /// be given a deterministic seed so that the starting peer for each work stealing /// search will be deterministic. /// + /// In addition to the code specifying `rng_seed` and interacting with the runtime, + /// the internals of Tokio and the Rust compiler may affect the sequences of random + /// numbers. In order to ensure repeatable results, the version of Tokio, the versions + /// of all other dependencies that interact with Tokio, and the Rust compiler version + /// should also all remain constant. + /// /// # Examples /// /// ``` diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 0261e31aaaf..98b06fc071a 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -36,7 +36,7 @@ pub(crate) struct HandleInner { #[cfg(feature = "rt")] pub(crate) blocking_spawner: blocking::Spawner, - /// Current random number generator seed (when no) + /// Current random number generator seed #[cfg(feature = "rt")] pub(super) seed_generator: RngSeedGenerator, }