diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index aea5af8caf4..d892ac111b4 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,6 +8,7 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::context; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle}; +use crate::util::{replace_thread_rng, RngSeedGenerator}; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -48,6 +49,9 @@ struct Inner { // Customizable wait timeout. keep_alive: Duration, + + // Random number seed + seed_generator: RngSeedGenerator, } struct Shared { @@ -182,6 +186,7 @@ impl BlockingPool { before_stop: builder.before_stop.clone(), thread_cap, keep_alive, + seed_generator: builder.seed_generator.next_generator(), }), }, shutdown_rx, @@ -431,6 +436,8 @@ impl Inner { if let Some(f) = &self.after_start { f() } + // We own this thread so there is no need to replace the RngSeed once we're done. + let _ = replace_thread_rng(self.seed_generator.next_seed()); let mut shared = self.shared.lock(); let mut join_on_thread = None; diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 5be8e4aeedc..4e93251b2f1 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,5 +1,6 @@ use crate::runtime::handle::Handle; use crate::runtime::{blocking, driver, Callback, Runtime, Spawner}; +use crate::util::{RngSeed, RngSeedGenerator}; use std::fmt; use std::io; @@ -90,6 +91,9 @@ pub struct Builder { /// This option should only be exposed as unstable. pub(super) disable_lifo_slot: bool, + /// Specify a random number generator seed to provide deterministic results + pub(super) seed_generator: RngSeedGenerator, + #[cfg(tokio_unstable)] pub(super) unhandled_panic: UnhandledPanic, } @@ -255,6 +259,8 @@ impl Builder { global_queue_interval, event_interval, + seed_generator: RngSeedGenerator::new(RngSeed::new()), + #[cfg(tokio_unstable)] unhandled_panic: UnhandledPanic::Ignore, @@ -829,6 +835,42 @@ impl Builder { self.disable_lifo_slot = true; self } + + /// Specifies the random number generation seed to use within all threads associated + /// with the runtime being built. + /// + /// This option is intended to make certain parts of the runtime deterministic. + /// Specifically, it affects the [`tokio::select!`] macro and the work stealing + /// algorithm. In the case of [`tokio::select!`] it will ensure that the order that + /// branches are polled is deterministic. + /// + /// In the case of work stealing, it's a little more complicated. Each worker will + /// be given a deterministic seed so that the starting peer for each work stealing + /// search will be deterministic. + /// + /// In addition to the code specifying `rng_seed` and interacting with the runtime, + /// the internals of Tokio and the Rust compiler may affect the sequences of random + /// numbers. In order to ensure repeatable results, the version of Tokio, the versions + /// of all other dependencies that interact with Tokio, and the Rust compiler version + /// should also all remain constant. + /// + /// # Examples + /// + /// ``` + /// # use tokio::runtime::{self, RngSeed}; + /// # pub fn main() { + /// let seed = RngSeed::from_bytes(b"place your seed here"); + /// let rt = runtime::Builder::new_current_thread() + /// .rng_seed(seed) + /// .build(); + /// # } + /// ``` + /// + /// [`tokio::select!`]: crate::select + pub fn rng_seed(&mut self, seed: RngSeed) -> &mut Self { + self.seed_generator = RngSeedGenerator::new(seed); + self + } } fn build_current_thread_runtime(&mut self) -> io::Result { @@ -855,6 +897,7 @@ impl Builder { #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, + seed_generator: self.seed_generator.next_generator(), }, ); let spawner = Spawner::CurrentThread(scheduler.spawner().clone()); @@ -863,6 +906,7 @@ impl Builder { spawner, driver: driver_handle, blocking_spawner, + seed_generator: self.seed_generator.next_generator(), }); Ok(Runtime { @@ -972,6 +1016,7 @@ cfg_rt_multi_thread! { #[cfg(tokio_unstable)] unhandled_panic: self.unhandled_panic.clone(), disable_lifo_slot: self.disable_lifo_slot, + seed_generator: self.seed_generator.next_generator(), }, ); let spawner = Spawner::MultiThread(scheduler.spawner().clone()); @@ -980,6 +1025,7 @@ cfg_rt_multi_thread! { spawner, driver: driver_handle, blocking_spawner, + seed_generator: self.seed_generator.next_generator(), }); // Create the runtime handle diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index 59c19988e5e..39eb1cf118b 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -1,5 +1,6 @@ #![cfg_attr(any(not(feature = "full"), tokio_wasm), allow(dead_code))] use crate::runtime::Callback; +use crate::util::RngSeedGenerator; pub(crate) struct Config { /// How many ticks before pulling a task from the global/remote queue? @@ -23,6 +24,10 @@ pub(crate) struct Config { /// stop-gap, this unstable option lets users disable the LIFO task. pub(crate) disable_lifo_slot: bool, + /// Random number generator seed to configure runtimes to act in a + /// deterministic way. + pub(crate) seed_generator: RngSeedGenerator, + #[cfg(tokio_unstable)] /// How to respond to unhandled task panics. pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, diff --git a/tokio/src/runtime/context.rs b/tokio/src/runtime/context.rs index e0f8a384ae6..53a01cfdaeb 100644 --- a/tokio/src/runtime/context.rs +++ b/tokio/src/runtime/context.rs @@ -1,5 +1,6 @@ //! Thread local runtime context use crate::runtime::{Handle, TryCurrentError}; +use crate::util::{replace_thread_rng, RngSeed}; use std::cell::RefCell; @@ -99,21 +100,29 @@ pub(crate) fn enter(new: Handle) -> EnterGuard { /// /// [`Handle`]: Handle pub(crate) fn try_enter(new: Handle) -> Option { - CONTEXT - .try_with(|ctx| { - let old = ctx.borrow_mut().replace(new); - EnterGuard(old) - }) - .ok() + let rng_seed = new.as_inner().seed_generator.next_seed(); + let old_handle = CONTEXT.try_with(|ctx| ctx.borrow_mut().replace(new)).ok()?; + + let old_seed = replace_thread_rng(rng_seed); + + Some(EnterGuard { + old_handle, + old_seed, + }) } #[derive(Debug)] -pub(crate) struct EnterGuard(Option); +pub(crate) struct EnterGuard { + old_handle: Option, + old_seed: RngSeed, +} impl Drop for EnterGuard { fn drop(&mut self) { CONTEXT.with(|ctx| { - *ctx.borrow_mut() = self.0.take(); + *ctx.borrow_mut() = self.old_handle.take(); }); + // We discard the RngSeed associated with this guard + let _ = replace_thread_rng(self.old_seed.clone()); } } diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index e10b07c3d9a..98b06fc071a 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -3,6 +3,9 @@ use crate::runtime::driver; +#[cfg(feature = "rt")] +use crate::util::RngSeedGenerator; + use std::sync::Arc; /// Handle to the runtime. @@ -32,6 +35,10 @@ pub(crate) struct HandleInner { /// Blocking pool spawner #[cfg(feature = "rt")] pub(crate) blocking_spawner: blocking::Spawner, + + /// Current random number generator seed + #[cfg(feature = "rt")] + pub(super) seed_generator: RngSeedGenerator, } cfg_rt! { diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 5bd0a9b66d2..d6863b5044f 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -216,6 +216,7 @@ cfg_rt! { pub use self::builder::Builder; cfg_unstable! { pub use self::builder::UnhandledPanic; + pub use crate::util::RngSeed; } pub(crate) mod context; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 22ca2dbe842..2193486484f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -58,7 +58,6 @@ use crate::coop; use crate::future::Future; -use crate::loom::rand::seed; use crate::loom::sync::{Arc, Mutex}; use crate::runtime; use crate::runtime::enter::EnterContext; @@ -206,7 +205,7 @@ pub(super) fn create(size: usize, park: Parker, config: Config) -> (Arc, is_shutdown: false, park: Some(park), metrics: MetricsBatch::new(), - rand: FastRand::new(seed()), + rand: FastRand::new(config.seed_generator.next_seed()), })); remotes.push(Remote { steal, unpark }); diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index b77979e63ce..65907231b61 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -40,13 +40,15 @@ pub(crate) use wake_list::WakeList; ))] pub(crate) mod linked_list; -#[cfg(any(feature = "rt-multi-thread", feature = "macros"))] +#[cfg(any(feature = "rt", feature = "macros"))] mod rand; cfg_rt! { mod idle_notified_set; pub(crate) use idle_notified_set::IdleNotifiedSet; + pub(crate) use self::rand::{RngSeedGenerator,replace_thread_rng}; + mod wake; pub(crate) use wake::WakerRef; pub(crate) use wake::{waker_ref, Wake}; @@ -61,6 +63,14 @@ cfg_rt! { pub(crate) use rc_cell::RcCell; } +#[cfg_attr(not(tokio_unstable), allow(unreachable_pub))] +#[cfg(feature = "rt")] +pub use self::rand::RngSeed; + +#[cfg(any(feature = "macros"))] +#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] +pub use self::rand::thread_rng_n; + cfg_rt_multi_thread! { pub(crate) use self::rand::FastRand; @@ -70,8 +80,4 @@ cfg_rt_multi_thread! { pub(crate) mod trace; -#[cfg(any(feature = "macros"))] -#[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] -pub use self::rand::thread_rng_n; - pub(crate) mod error; diff --git a/tokio/src/util/rand.rs b/tokio/src/util/rand.rs index 6b19c8be957..09754cea993 100644 --- a/tokio/src/util/rand.rs +++ b/tokio/src/util/rand.rs @@ -1,5 +1,103 @@ use std::cell::Cell; +cfg_rt! { + use std::sync::Mutex; + + /// A deterministic generator for seeds (and other generators). + /// + /// Given the same initial seed, the generator will output the same sequence of seeds. + /// + /// Since the seed generator will be kept in a runtime handle, we need to wrap `FastRand` + /// in a Mutex to make it thread safe. Different to the `FastRand` that we keep in a + /// thread local store, the expectation is that seed generation will not need to happen + /// very frequently, so the cost of the mutex should be minimal. + #[derive(Debug)] + pub(crate) struct RngSeedGenerator { + /// Internal state for the seed generator. We keep it in a Mutex so that we can safely + /// use it across multiple threads. + state: Mutex, + } + + impl RngSeedGenerator { + /// Returns a new generator from the provided seed. + pub(crate) fn new(seed: RngSeed) -> Self { + Self { + state: Mutex::new(FastRand::new(seed)), + } + } + + /// Returns the next seed in the sequence. + pub(crate) fn next_seed(&self) -> RngSeed { + let rng = self + .state + .lock() + .expect("RNG seed generator is internally corrupt"); + + let s = rng.fastrand(); + let r = rng.fastrand(); + + RngSeed::from_pair(s, r) + } + + /// Directly creates a generator using the next seed. + pub(crate) fn next_generator(&self) -> Self { + RngSeedGenerator::new(self.next_seed()) + } + } +} + +/// A seed for random numnber generation. +/// +/// In order to make certain functions within a runtime deterministic, a seed +/// can be specified at the time of creation. +#[allow(unreachable_pub)] +#[derive(Clone, Debug)] +pub struct RngSeed { + s: u32, + r: u32, +} + +impl RngSeed { + /// Creates a random seed using loom internally. + pub(crate) fn new() -> Self { + Self::from_u64(crate::loom::rand::seed()) + } + + cfg_unstable! { + /// Generates a seed from the provided byte slice. + /// + /// # Example + /// + /// ``` + /// # use tokio::runtime::RngSeed; + /// let seed = RngSeed::from_bytes(b"make me a seed"); + /// ``` + #[cfg(feature = "rt")] + pub fn from_bytes(bytes: &[u8]) -> Self { + use std::{collections::hash_map::DefaultHasher, hash::Hasher}; + + let mut hasher = DefaultHasher::default(); + hasher.write(bytes); + Self::from_u64(hasher.finish()) + } + } + + fn from_u64(seed: u64) -> Self { + let one = (seed >> 32) as u32; + let mut two = seed as u32; + + if two == 0 { + // This value cannot be zero + two = 1; + } + + Self::from_pair(one, two) + } + + fn from_pair(s: u32, r: u32) -> Self { + Self { s, r } + } +} /// Fast random number generate. /// /// Implement xorshift64+: 2 32-bit xorshift sequences added together. @@ -15,21 +113,29 @@ pub(crate) struct FastRand { impl FastRand { /// Initializes a new, thread-local, fast random number generator. - pub(crate) fn new(seed: u64) -> FastRand { - let one = (seed >> 32) as u32; - let mut two = seed as u32; - - if two == 0 { - // This value cannot be zero - two = 1; - } - + pub(crate) fn new(seed: RngSeed) -> FastRand { FastRand { - one: Cell::new(one), - two: Cell::new(two), + one: Cell::new(seed.s), + two: Cell::new(seed.r), } } + /// Replaces the state of the random number generator with the provided seed, returning + /// the seed that represents the previous state of the random number generator. + /// + /// The random number generator will become equivalent to one created with + /// the same seed. + #[cfg(feature = "rt")] + pub(crate) fn replace_seed(&self, seed: RngSeed) -> RngSeed { + let old_seed = RngSeed::from_pair(self.one.get(), self.two.get()); + + self.one.replace(seed.s); + self.two.replace(seed.r); + + old_seed + } + + #[cfg(any(feature = "macros", feature = "rt-multi-thread"))] pub(crate) fn fastrand_n(&self, n: u32) -> u32 { // This is similar to fastrand() % n, but faster. // See https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/ @@ -51,14 +157,24 @@ impl FastRand { } } +thread_local! { + static THREAD_RNG: FastRand = FastRand::new(RngSeed::new()); +} + +/// Seeds the thread local random number generator with the provided seed and +/// return the previously stored seed. +/// +/// The returned seed can be later used to return the thread local random number +/// generator to its previous state. +#[cfg(feature = "rt")] +pub(crate) fn replace_thread_rng(rng_seed: RngSeed) -> RngSeed { + THREAD_RNG.with(|rng| rng.replace_seed(rng_seed)) +} + // Used by the select macro and `StreamMap` #[cfg(any(feature = "macros"))] #[doc(hidden)] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] pub fn thread_rng_n(n: u32) -> u32 { - thread_local! { - static THREAD_RNG: FastRand = FastRand::new(crate::loom::rand::seed()); - } - THREAD_RNG.with(|rng| rng.fastrand_n(n)) } diff --git a/tokio/tests/macros_select.rs b/tokio/tests/macros_select.rs index 23162a1a918..e642a5d9ff4 100644 --- a/tokio/tests/macros_select.rs +++ b/tokio/tests/macros_select.rs @@ -599,3 +599,62 @@ async fn mut_ref_patterns() { }, }; } + +#[cfg(tokio_unstable)] +mod unstable { + use tokio::runtime::RngSeed; + + #[test] + fn deterministic_select_current_thread() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let num = select_0_to_9().await; + assert_eq!(num, 5); + + let num = select_0_to_9().await; + assert_eq!(num, 1); + }); + } + + #[test] + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + fn deterministic_select_multi_thread() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let _ = tokio::spawn(async { + let num = select_0_to_9().await; + assert_eq!(num, 6); + + let num = select_0_to_9().await; + assert_eq!(num, 9); + }) + .await; + }); + } + + async fn select_0_to_9() -> u32 { + tokio::select!( + x = async { 0 } => x, + x = async { 1 } => x, + x = async { 2 } => x, + x = async { 3 } => x, + x = async { 4 } => x, + x = async { 5 } => x, + x = async { 6 } => x, + x = async { 7 } => x, + x = async { 8 } => x, + x = async { 9 } => x, + ) + } +} diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 0cb92487af3..2f441865941 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -293,7 +293,7 @@ fn timeout_panics_when_no_time_handle() { #[cfg(tokio_unstable)] mod unstable { - use tokio::runtime::{Builder, UnhandledPanic}; + use tokio::runtime::{Builder, RngSeed, UnhandledPanic}; #[test] #[should_panic( @@ -381,6 +381,48 @@ mod unstable { assert!(th.join().is_err()); } } + + #[test] + fn rng_seed() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 59); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 10); + }); + } + + #[test] + fn rng_seed_multi_enter() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_current_thread() + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 59); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 10); + }); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 86); + + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 1); + }); + } } fn rt() -> Runtime { diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index d3dbabdc9c0..9a8644af6fe 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -542,6 +542,7 @@ fn rt() -> runtime::Runtime { #[cfg(tokio_unstable)] mod unstable { use super::*; + use tokio::runtime::RngSeed; #[test] fn test_disable_lifo_slot() { @@ -561,4 +562,27 @@ mod unstable { .unwrap(); }) } + + #[test] + fn rng_seed() { + let seed = b"bytes used to generate seed"; + let rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .rng_seed(RngSeed::from_bytes(seed)) + .build() + .unwrap(); + + rt.block_on(async { + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 86); + + let _ = tokio::spawn(async { + // Because we only have a single worker thread, the + // RNG will be deterministic here as well. + let random = tokio::macros::support::thread_rng_n(100); + assert_eq!(random, 64); + }) + .await; + }); + } }