From 9198e601bac4c3dc96e963b8aab39dc3966e84a8 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Fri, 16 Sep 2022 17:41:09 +0200 Subject: [PATCH] rt: add `rng_seed` option to `runtime::Builder` (#4910) The `tokio::select!` macro polls branches in a random order. While this is desirable in production, for testing purposes a more deterministic approach can be useul. This change adds an additional parameter `rng_seed` to the runtime `Builder` to set the random number generator seed. This value is then used to reset the seed on the current thread when the runtime is entered into (restoring the previous value when the thread leaves the runtime). All threads created explicitly by the runtime also have a seed set as the runtime is built. Each thread is set with a seed from a deterministic sequence. This guarantees that calls to the `tokio::select!` macro which are performed in the same order on the same thread will poll branches in the same order. Additionally, the peer chosen to attempt to steal work from also uses a deterministic sequence if `rng_seed` is set. Both the builder parameter as well as the `RngSeed` struct are marked unstable initially. --- tokio/src/runtime/blocking/pool.rs | 7 + tokio/src/runtime/builder.rs | 46 ++++++ tokio/src/runtime/config.rs | 5 + tokio/src/runtime/context.rs | 25 ++- tokio/src/runtime/handle.rs | 7 + tokio/src/runtime/mod.rs | 1 + .../runtime/scheduler/multi_thread/worker.rs | 3 +- tokio/src/util/mod.rs | 16 +- tokio/src/util/rand.rs | 146 ++++++++++++++++-- tokio/tests/macros_select.rs | 59 +++++++ tokio/tests/rt_basic.rs | 44 +++++- tokio/tests/rt_threaded.rs | 24 +++ 12 files changed, 352 insertions(+), 31 deletions(-) diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index aea5af8caf4..d892ac111b4 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,6 +8,7 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; +use crate::util::{replace_thread_rng, RngSeedGenerator}; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -48,6 +49,9 @@ struct Inner { // Customizable wait timeout. keep_alive: Duration, + + // Random number seed + seed_generator: RngSeedGenerator, } struct Shared { @@ -182,6 +186,7 @@ impl BlockingPool { before_stop: builder.before_stop.clone(), thread_cap, keep_alive, + seed_generator: builder.seed_generator.next_generator(), }), }, shutdown_rx, @@ -431,6 +436,8 @@ impl Inner { if let Some(f) = &self.after_start { f() } + // We own this thread so there is no need to replace the RngSeed once we're done. + let _ = replace_thread_rng(self.seed_generator.next_seed()); let mut shared = self.shared.lock(); let mut join_on_thread = None; diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 5be8e4aeedc..4e93251b2f1 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,5 +1,6 @@ use crate::runtime::handle::Handle; use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; +use crate::util::{RngSeed, RngSeedGenerator}; use std::fmt; use std::io; @@ -90,6 +91,9 @@ pub struct Builder { /// This option should only be exposed as unstable. pub(super) disable_lifo_slot: bool, + /// Specify a random number generator seed to provide deterministic results + pub(super) seed_generator: RngSeedGenerator, + #[cfg(tokio_unstable)] pub(super) unhandled_panic: UnhandledPanic, } @@ -255,6 +259,8 @@ impl Builder { global_queue_interval, event_interval, + seed_generator: RngSeedGenerator::new(RngSeed::new()), + #[cfg(tokio_unstable)] unhandled_panic: UnhandledPanic::Ignore, @@ -829,6 +835,42 @@ impl Builder { self.disable_lifo_slot = true; self } + + /// Specifies the random number generation seed to use within all threads associated + /// with the runtime being built. + /// + /// This option is intended to make certain parts of the runtime deterministic. + /// Specifically, it affects the [`tokio::select!`] macro and the work stealing + /// algorithm. In the case of [`tokio::select!`] it will ensure that the order that + /// branches are polled is deterministic. + /// + /// In the case of work stealing, it's a little more complicated. Each worker will + /// be given a deterministic seed so that the starting peer for each work stealing + /// search will be deterministic. + /// + /// In addition to the code specifying `rng_seed` and interacting with the runtime, + /// the internals of Tokio and the Rust compiler may affect the sequences of random + /// numbers. In order to ensure repeatable results, the version of Tokio, the versions + /// of all other dependencies that interact with Tokio, and the Rust compiler version + /// should also all remain constant. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime::{self, RngSeed}; + /// # pub fn main() { + /// let seed = RngSeed::from_bytes(b"place your seed here"); + /// let rt = runtime::Builder::new_current_thread() + /// .rng_seed(seed) + /// .build(); + /// # } + /// ``` + /// + /// [`tokio::select!`]: crate::select + pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { + self.seed_generator = RngSeedGenerator::new(seed); + self + } } fn build_current_thread_runtime(&mut self) -> io::Result { @@ -855,6 +897,7 @@ impl Builder { #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, + seed_generator: self.seed_generator.next_generator(), }, ); let spawner = Spawner::CurrentThread(scheduler.spawner().clone()); @@ -863,6 +906,7 @@ impl Builder { spawner, driver: driver_handle, blocking_spawner, + seed_generator: self.seed_generator.next_generator(), }); Ok(Runtime { @@ -972,6 +1016,7 @@ cfg_rt_multi_thread! { #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, + seed_generator: self.seed_generator.next_generator(), }, ); let spawner = Spawner::MultiThread(scheduler.spawner().clone()); @@ -980,6 +1025,7 @@ cfg_rt_multi_thread! { spawner, driver: driver_handle, blocking_spawner, + seed_generator: self.seed_generator.next_generator(), }); // Create the runtime handle diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index 59c19988e5e..39eb1cf118b 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -1,5 +1,6 @@ #![cfg_attr(any(not(feature = "full"), tokio_wasm), allow(dead_code))] use crate::runtime::Callback; +use crate::util::RngSeedGenerator; pub(crate) struct Config { /// How many ticks before pulling a task from the global/remote queue? @@ -23,6 +24,10 @@ pub(crate) struct Config { /// stop-gap, this unstable option lets users disable the LIFO task. pub(crate) disable_lifo_slot: bool, + /// Random number generator seed to configure runtimes to act in a + /// deterministic way. + pub(crate) seed_generator: RngSeedGenerator, + #[cfg(tokio_unstable)] /// How to respond to unhandled task panics. pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index e0f8a384ae6..53a01cfdaeb 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -1,5 +1,6 @@ //! Thread local runtime context use crate::runtime::{Handle, TryCurrentError}; +use crate::util::{replace_thread_rng, RngSeed}; use std::cell::RefCell; @@ -99,21 +100,29 @@ pub(crate) fn enter(new: Handle) -> EnterGuard { /// /// [`Handle`]: Handle pub(crate) fn try_enter(new: Handle) -> Option { - CONTEXT - .try_with(|ctx| { - let old = ctx.borrow_mut().replace(new); - EnterGuard(old) - }) - .ok() + let rng_seed = new.as_inner().seed_generator.next_seed(); + let old_handle = CONTEXT.try_with(|ctx| ctx.borrow_mut().replace(new)).ok()?; + + let old_seed = replace_thread_rng(rng_seed); + + Some(EnterGuard { + old_handle, + old_seed, + }) } #[derive(Debug)] -pub(crate) struct EnterGuard(Option); +pub(crate) struct EnterGuard { + old_handle: Option, + old_seed: RngSeed, +} impl Drop for EnterGuard { fn drop(&mut self) { CONTEXT.with(|ctx| { - *ctx.borrow_mut() = self.0.take(); + *ctx.borrow_mut() = self.old_handle.take(); }); + // We discard the RngSeed associated with this guard + let _ = replace_thread_rng(self.old_seed.clone()); } } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index e10b07c3d9a..98b06fc071a 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -3,6 +3,9 @@ use crate::runtime::driver; +#[cfg(feature = "rt")] +use crate::util::RngSeedGenerator; + use std::sync::Arc; /// Handle to the runtime. @@ -32,6 +35,10 @@ pub(crate) struct HandleInner { /// Blocking pool spawner #[cfg(feature = "rt")] pub(crate) blocking_spawner: blocking::Spawner, + + /// Current random number generator seed + #[cfg(feature = "rt")] + pub(super) seed_generator: RngSeedGenerator, } cfg_rt! { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 5bd0a9b66d2..d6863b5044f 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -216,6 +216,7 @@ cfg_rt! { pub use self::builder::Builder; cfg_unstable! { pub use self::builder::UnhandledPanic; + pub use crate::util::RngSeed; } pub(crate) mod context; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 22ca2dbe842..2193486484f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -58,7 +58,6 @@ use crate::coop; use crate::future::Future; -use crate::loom::rand::seed; use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::enter::EnterContext; @@ -206,7 +205,7 @@ pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc, is_shutdown: false, park: Some(park), metrics: MetricsBatch::new(), - rand: FastRand::new(seed()), + rand: FastRand::new(config.seed_generator.next_seed()), })); remotes.push(Remote { steal, unpark }); diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index b77979e63ce..65907231b61 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -40,13 +40,15 @@ pub(crate) use wake_list::WakeList; ))] pub(crate) mod linked_list; -#[cfg(any(feature = "rt-multi-thread", feature = "macros"))] +#[cfg(any(feature = "rt", feature = "macros"))] mod rand; cfg_rt! { mod idle_notified_set; pub(crate) use idle_notified_set::IdleNotifiedSet; + pub(crate) use self::rand::{RngSeedGenerator,replace_thread_rng}; + mod wake; pub(crate) use wake::WakerRef; pub(crate) use wake::{waker_ref, Wake}; @@ -61,6 +63,14 @@ cfg_rt! { pub(crate) use rc_cell::RcCell; } +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[cfg(feature = "rt")] +pub use self::rand::RngSeed; + +#[cfg(any(feature = "macros"))] +#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] +pub use self::rand::thread_rng_n; + cfg_rt_multi_thread! { pub(crate) use self::rand::FastRand; @@ -70,8 +80,4 @@ cfg_rt_multi_thread! { pub(crate) mod trace; -#[cfg(any(feature = "macros"))] -#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] -pub use self::rand::thread_rng_n; - pub(crate) mod error; diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 6b19c8be957..09754cea993 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -1,5 +1,103 @@ use std::cell::Cell; +cfg_rt! { + use std::sync::Mutex; + + /// A deterministic generator for seeds (and other generators). + /// + /// Given the same initial seed, the generator will output the same sequence of seeds. + /// + /// Since the seed generator will be kept in a runtime handle, we need to wrap `FastRand` + /// in a Mutex to make it thread safe. Different to the `FastRand` that we keep in a + /// thread local store, the expectation is that seed generation will not need to happen + /// very frequently, so the cost of the mutex should be minimal. + #[derive(Debug)] + pub(crate) struct RngSeedGenerator { + /// Internal state for the seed generator. We keep it in a Mutex so that we can safely + /// use it across multiple threads. + state: Mutex, + } + + impl RngSeedGenerator { + /// Returns a new generator from the provided seed. + pub(crate) fn new(seed: RngSeed) -> Self { + Self { + state: Mutex::new(FastRand::new(seed)), + } + } + + /// Returns the next seed in the sequence. + pub(crate) fn next_seed(&self) -> RngSeed { + let rng = self + .state + .lock() + .expect("RNG seed generator is internally corrupt"); + + let s = rng.fastrand(); + let r = rng.fastrand(); + + RngSeed::from_pair(s, r) + } + + /// Directly creates a generator using the next seed. + pub(crate) fn next_generator(&self) -> Self { + RngSeedGenerator::new(self.next_seed()) + } + } +} + +/// A seed for random numnber generation. +/// +/// In order to make certain functions within a runtime deterministic, a seed +/// can be specified at the time of creation. +#[allow(unreachable_pub)] +#[derive(Clone, Debug)] +pub struct RngSeed { + s: u32, + r: u32, +} + +impl RngSeed { + /// Creates a random seed using loom internally. + pub(crate) fn new() -> Self { + Self::from_u64(crate::loom::rand::seed()) + } + + cfg_unstable! { + /// Generates a seed from the provided byte slice. + /// + /// # Example + /// + /// ``` + /// # use tokio::runtime::RngSeed; + /// let seed = RngSeed::from_bytes(b"make me a seed"); + /// ``` + #[cfg(feature = "rt")] + pub fn from_bytes(bytes: &[u8]) -> Self { + use std::{collections::hash_map::DefaultHasher, hash::Hasher}; + + let mut hasher = DefaultHasher::default(); + hasher.write(bytes); + Self::from_u64(hasher.finish()) + } + } + + fn from_u64(seed: u64) -> Self { + let one = (seed >> 32) as u32; + let mut two = seed as u32; + + if two == 0 { + // This value cannot be zero + two = 1; + } + + Self::from_pair(one, two) + } + + fn from_pair(s: u32, r: u32) -> Self { + Self { s, r } + } +} /// Fast random number generate. /// /// Implement xorshift64+: 2 32-bit xorshift sequences added together. @@ -15,21 +113,29 @@ pub(crate) struct FastRand { impl FastRand { /// Initializes a new, thread-local, fast random number generator. - pub(crate) fn new(seed: u64) -> FastRand { - let one = (seed >> 32) as u32; - let mut two = seed as u32; - - if two == 0 { - // This value cannot be zero - two = 1; - } - + pub(crate) fn new(seed: RngSeed) -> FastRand { FastRand { - one: Cell::new(one), - two: Cell::new(two), + one: Cell::new(seed.s), + two: Cell::new(seed.r), } } + /// Replaces the state of the random number generator with the provided seed, returning + /// the seed that represents the previous state of the random number generator. + /// + /// The random number generator will become equivalent to one created with + /// the same seed. + #[cfg(feature = "rt")] + pub(crate) fn replace_seed(&self, seed: RngSeed) -> RngSeed { + let old_seed = RngSeed::from_pair(self.one.get(), self.two.get()); + + self.one.replace(seed.s); + self.two.replace(seed.r); + + old_seed + } + + #[cfg(any(feature = "macros", feature = "rt-multi-thread"))] pub(crate) fn fastrand_n(&self, n: u32) -> u32 { // This is similar to fastrand() % n, but faster. // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ @@ -51,14 +157,24 @@ impl FastRand { } } +thread_local! { + static THREAD_RNG: FastRand = FastRand::new(RngSeed::new()); +} + +/// Seeds the thread local random number generator with the provided seed and +/// return the previously stored seed. +/// +/// The returned seed can be later used to return the thread local random number +/// generator to its previous state. +#[cfg(feature = "rt")] +pub(crate) fn replace_thread_rng(rng_seed: RngSeed) -> RngSeed { + THREAD_RNG.with(|rng| rng.replace_seed(rng_seed)) +} + // Used by the select macro and `StreamMap` #[cfg(any(feature = "macros"))] #[doc(hidden)] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] pub fn thread_rng_n(n: u32) -> u32 { - thread_local! { - static THREAD_RNG: FastRand = FastRand::new(crate::loom::rand::seed()); - } - THREAD_RNG.with(|rng| rng.fastrand_n(n)) } diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index 23162a1a918..e642a5d9ff4 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -599,3 +599,62 @@ async fn mut_ref_patterns() { }, }; } + +#[cfg(tokio_unstable)] +mod unstable { + use tokio::runtime::RngSeed; + + #[test] + fn deterministic_select_current_thread() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let num = select_0_to_9().await; + assert_eq!(num, 5); + + let num = select_0_to_9().await; + assert_eq!(num, 1); + }); + } + + #[test] + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + fn deterministic_select_multi_thread() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let _ = tokio::spawn(async { + let num = select_0_to_9().await; + assert_eq!(num, 6); + + let num = select_0_to_9().await; + assert_eq!(num, 9); + }) + .await; + }); + } + + async fn select_0_to_9() -> u32 { + tokio::select!( + x = async { 0 } => x, + x = async { 1 } => x, + x = async { 2 } => x, + x = async { 3 } => x, + x = async { 4 } => x, + x = async { 5 } => x, + x = async { 6 } => x, + x = async { 7 } => x, + x = async { 8 } => x, + x = async { 9 } => x, + ) + } +} diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 0cb92487af3..2f441865941 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -293,7 +293,7 @@ fn timeout_panics_when_no_time_handle() { #[cfg(tokio_unstable)] mod unstable { - use tokio::runtime::{Builder, UnhandledPanic}; + use tokio::runtime::{Builder, RngSeed, UnhandledPanic}; #[test] #[should_panic( @@ -381,6 +381,48 @@ mod unstable { assert!(th.join().is_err()); } } + + #[test] + fn rng_seed() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 59); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 10); + }); + } + + #[test] + fn rng_seed_multi_enter() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 59); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 10); + }); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 86); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 1); + }); + } } fn rt() -> Runtime { diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index d3dbabdc9c0..9a8644af6fe 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -542,6 +542,7 @@ fn rt() -> runtime::Runtime { #[cfg(tokio_unstable)] mod unstable { use super::*; + use tokio::runtime::RngSeed; #[test] fn test_disable_lifo_slot() { @@ -561,4 +562,27 @@ mod unstable { .unwrap(); }) } + + #[test] + fn rng_seed() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 86); + + let _ = tokio::spawn(async { + // Because we only have a single worker thread, the + // RNG will be deterministic here as well. + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 64); + }) + .await; + }); + } }