diff --git a/tests-build/tests/fail/macros_invalid_input.stderr b/tests-build/tests/fail/macros_invalid_input.stderr index 4c68bd93f6c..bba2009352d 100644 --- a/tests-build/tests/fail/macros_invalid_input.stderr +++ b/tests-build/tests/fail/macros_invalid_input.stderr @@ -4,7 +4,7 @@ error: the async keyword is missing from the function declaration 4 | fn main_is_not_async() {} | ^^ -error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads` +error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused` --> $DIR/macros_invalid_input.rs:6:15 | 6 | #[tokio::main(foo)] @@ -28,7 +28,7 @@ error: the test function cannot accept arguments 16 | async fn test_fn_has_args(_x: u8) {} | ^^^^^^ -error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads` +error: Unknown attribute foo is specified; expected one of: `flavor`, `worker_threads`, `start_paused` --> $DIR/macros_invalid_input.rs:18:15 | 18 | #[tokio::test(foo)] diff --git a/tokio-macros/src/entry.rs b/tokio-macros/src/entry.rs index ae8c0b9d9f9..f82a329af16 100644 --- a/tokio-macros/src/entry.rs +++ b/tokio-macros/src/entry.rs @@ -25,6 +25,7 @@ impl RuntimeFlavor { struct FinalConfig { flavor: RuntimeFlavor, worker_threads: Option, + start_paused: Option, } struct Configuration { @@ -32,6 +33,7 @@ struct Configuration { default_flavor: RuntimeFlavor, flavor: Option, worker_threads: Option<(usize, Span)>, + start_paused: Option<(bool, Span)>, } impl Configuration { @@ -44,6 +46,7 @@ impl Configuration { }, flavor: None, worker_threads: None, + start_paused: None, } } @@ -79,31 +82,57 @@ impl Configuration { Ok(()) } + fn set_start_paused(&mut self, start_paused: syn::Lit, span: Span) -> Result<(), syn::Error> { + if self.start_paused.is_some() { + return Err(syn::Error::new(span, "`start_paused` set multiple times.")); + } + + let start_paused = parse_bool(start_paused, span, "start_paused")?; + self.start_paused = Some((start_paused, span)); + Ok(()) + } + fn build(&self) -> Result { let flavor = self.flavor.unwrap_or(self.default_flavor); use RuntimeFlavor::*; - match (flavor, self.worker_threads) { - (CurrentThread, Some((_, worker_threads_span))) => Err(syn::Error::new( - worker_threads_span, - "The `worker_threads` option requires the `multi_thread` runtime flavor.", - )), - (CurrentThread, None) => Ok(FinalConfig { - flavor, - worker_threads: None, - }), - (Threaded, worker_threads) if self.rt_multi_thread_available => Ok(FinalConfig { - flavor, - worker_threads: worker_threads.map(|(val, _span)| val), - }), + + let worker_threads = match (flavor, self.worker_threads) { + (CurrentThread, Some((_, worker_threads_span))) => { + return Err(syn::Error::new( + worker_threads_span, + "The `worker_threads` option requires the `multi_thread` runtime flavor.", + )) + } + (CurrentThread, None) => None, + (Threaded, worker_threads) if self.rt_multi_thread_available => { + worker_threads.map(|(val, _span)| val) + } (Threaded, _) => { let msg = if self.flavor.is_none() { "The default runtime flavor is `multi_thread`, but the `rt-multi-thread` feature is disabled." } else { "The runtime flavor `multi_thread` requires the `rt-multi-thread` feature." }; - Err(syn::Error::new(Span::call_site(), msg)) + return Err(syn::Error::new(Span::call_site(), msg)); } - } + }; + + let start_paused = match (flavor, self.start_paused) { + (Threaded, Some((_, start_paused_span))) => { + return Err(syn::Error::new( + start_paused_span, + "The `start_paused` option requires the `current_thread` runtime flavor.", + )); + } + (CurrentThread, Some((start_paused, _))) => Some(start_paused), + (_, None) => None, + }; + + Ok(FinalConfig { + flavor, + worker_threads, + start_paused, + }) } } @@ -134,6 +163,16 @@ fn parse_string(int: syn::Lit, span: Span, field: &str) -> Result Result { + match bool { + syn::Lit::Bool(b) => Ok(b.value), + _ => Err(syn::Error::new( + span, + format!("Failed to parse {} as bool.", field), + )), + } +} + fn parse_knobs( mut input: syn::ItemFn, args: syn::AttributeArgs, @@ -174,6 +213,9 @@ fn parse_knobs( "flavor" => { config.set_flavor(namevalue.lit.clone(), namevalue.span())?; } + "start_paused" => { + config.set_start_paused(namevalue.lit.clone(), namevalue.span())?; + } "core_threads" => { let msg = "Attribute `core_threads` is renamed to `worker_threads`"; return Err(syn::Error::new_spanned(namevalue, msg)); @@ -204,11 +246,11 @@ fn parse_knobs( macro_name ) } - "flavor" | "worker_threads" => { + "flavor" | "worker_threads" | "start_paused" => { format!("The `{}` attribute requires an argument.", name) } name => { - format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`", name) + format!("Unknown attribute {} is specified; expected one of: `flavor`, `worker_threads`, `start_paused`", name) } }; return Err(syn::Error::new_spanned(path, msg)); @@ -235,6 +277,9 @@ fn parse_knobs( if let Some(v) = config.worker_threads { rt = quote! { #rt.worker_threads(#v) }; } + if let Some(v) = config.start_paused { + rt = quote! { #rt.start_paused(#v) }; + } let header = { if is_test { diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs index b48bd004967..1c8e29282d4 100644 --- a/tokio-macros/src/lib.rs +++ b/tokio-macros/src/lib.rs @@ -144,6 +144,30 @@ use proc_macro::TokenStream; /// } /// ``` /// +/// ### Configure the runtime to start with time paused +/// +/// ```rust +/// #[tokio::main(flavor = "current_thread", start_paused = true)] +/// async fn main() { +/// println!("Hello world"); +/// } +/// ``` +/// +/// Equivalent code not using `#[tokio::main]` +/// +/// ```rust +/// fn main() { +/// tokio::runtime::Builder::new_current_thread() +/// .enable_all() +/// .start_paused(true) +/// .build() +/// .unwrap() +/// .block_on(async { +/// println!("Hello world"); +/// }) +/// } +/// ``` +/// /// ### NOTE: /// /// If you rename the Tokio crate in your dependencies this macro will not work. @@ -225,6 +249,15 @@ pub fn main_rt(args: TokenStream, item: TokenStream) -> TokenStream { /// } /// ``` /// +/// ### Configure the runtime to start with time paused +/// +/// ```no_run +/// #[tokio::test(start_paused = true)] +/// async fn my_test() { +/// assert!(true); +/// } +/// ``` +/// /// ### NOTE: /// /// If you rename the Tokio crate in your dependencies this macro will not work. diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 1e719a235d0..eff72587194 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -121,6 +121,7 @@ tokio-test = { version = "0.4.0", path = "../tokio-test" } tokio-stream = { version = "0.1", path = "../tokio-stream" } futures = { version = "0.3.0", features = ["async-await"] } proptest = "0.10.0" +rand = "0.8.0" tempfile = "3.1.0" async-stream = "0.3" diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 1f8892eafc8..e845192977e 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -47,6 +47,9 @@ pub struct Builder { /// Whether or not to enable the time driver enable_time: bool, + /// Whether or not the clock should start paused. + start_paused: bool, + /// The number of worker threads, used by Runtime. /// /// Only used when not using the current-thread executor. @@ -110,6 +113,9 @@ impl Builder { // Time defaults to "off" enable_time: false, + // The clock starts not-paused + start_paused: false, + // Default to lazy auto-detection (one thread per CPU core) worker_threads: None, @@ -386,6 +392,7 @@ impl Builder { }, enable_io: self.enable_io, enable_time: self.enable_time, + start_paused: self.start_paused, } } @@ -489,6 +496,31 @@ cfg_time! { } } +cfg_test_util! { + impl Builder { + /// Controls if the runtime's clock starts paused or advancing. + /// + /// Pausing time requires the current-thread runtime; construction of + /// the runtime will panic otherwise. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_current_thread() + /// .enable_time() + /// .start_paused(true) + /// .build() + /// .unwrap(); + /// ``` + pub fn start_paused(&mut self, start_paused: bool) -> &mut Self { + self.start_paused = start_paused; + self + } + } +} + cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index b89fa4fca79..a0e8e2362b7 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -103,8 +103,8 @@ cfg_time! { pub(crate) type Clock = crate::time::Clock; pub(crate) type TimeHandle = Option; - fn create_clock(enable_pausing: bool) -> Clock { - crate::time::Clock::new(enable_pausing) + fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock { + crate::time::Clock::new(enable_pausing, start_paused) } fn create_time_driver( @@ -131,7 +131,7 @@ cfg_not_time! { pub(crate) type Clock = (); pub(crate) type TimeHandle = (); - fn create_clock(_enable_pausing: bool) -> Clock { + fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock { () } @@ -162,13 +162,15 @@ pub(crate) struct Cfg { pub(crate) enable_io: bool, pub(crate) enable_time: bool, pub(crate) enable_pause_time: bool, + pub(crate) start_paused: bool, } impl Driver { pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Resources)> { let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; - let clock = create_clock(cfg.enable_pause_time); + let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); + let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, clock.clone()); diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index a62fbe39009..8957800cbb5 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -17,7 +17,7 @@ cfg_not_test_util! { } impl Clock { - pub(crate) fn new(_enable_pausing: bool) -> Clock { + pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock { Clock {} } @@ -78,7 +78,8 @@ cfg_test_util! { /// that depend on time. /// /// Pausing time requires the `current_thread` Tokio runtime. This is the - /// default runtime used by `#[tokio::test]` + /// default runtime used by `#[tokio::test]`. The runtime can be initialized + /// with time in a paused state using the `Builder::start_paused` method. /// /// # Panics /// @@ -149,16 +150,22 @@ cfg_test_util! { impl Clock { /// Return a new `Clock` instance that uses the current execution context's /// source of time. - pub(crate) fn new(enable_pausing: bool) -> Clock { + pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock { let now = std::time::Instant::now(); - Clock { + let clock = Clock { inner: Arc::new(Mutex::new(Inner { enable_pausing, base: now, unfrozen: Some(now), })), + }; + + if start_paused { + clock.pause(); } + + clock } pub(crate) fn pause(&self) { diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 9fbc0b3cf96..615307ea572 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -102,8 +102,8 @@ pub(self) struct ClockTime { impl ClockTime { pub(self) fn new(clock: Clock) -> Self { Self { + start_time: clock.now(), clock, - start_time: super::clock::now(), } } diff --git a/tokio/src/time/driver/tests/mod.rs b/tokio/src/time/driver/tests/mod.rs index cfefed32f72..8ae4a84b442 100644 --- a/tokio/src/time/driver/tests/mod.rs +++ b/tokio/src/time/driver/tests/mod.rs @@ -41,7 +41,7 @@ fn model(f: impl Fn() + Send + Sync + 'static) { #[test] fn single_timer() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -72,7 +72,7 @@ fn single_timer() { #[test] fn drop_timer() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -103,7 +103,7 @@ fn drop_timer() { #[test] fn change_waker() { model(|| { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -138,7 +138,7 @@ fn reset_future() { model(|| { let finished_early = Arc::new(AtomicBool::new(false)); - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); let time_source = super::ClockTime::new(clock.clone()); let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); @@ -185,7 +185,7 @@ fn reset_future() { #[test] #[cfg(not(loom))] fn poll_process_levels() { - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); clock.pause(); let time_source = super::ClockTime::new(clock.clone()); @@ -226,7 +226,7 @@ fn poll_process_levels() { fn poll_process_levels_targeted() { let mut context = Context::from_waker(noop_waker_ref()); - let clock = crate::time::clock::Clock::new(true); + let clock = crate::time::clock::Clock::new(true, false); clock.pause(); let time_source = super::ClockTime::new(clock.clone()); diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index 92f67af2f25..b267125b15b 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -24,7 +24,7 @@ cfg_rt! { } cfg_rt_multi_thread! { - pub(crate) use rand::FastRand; + pub(crate) use self::rand::FastRand; mod try_lock; pub(crate) use try_lock::TryLock; @@ -34,7 +34,7 @@ pub(crate) mod trace; #[cfg(any(feature = "macros"))] #[cfg_attr(not(feature = "macros"), allow(unreachable_pub))] -pub use rand::thread_rng_n; +pub use self::rand::thread_rng_n; #[cfg(any( feature = "rt", diff --git a/tokio/tests/time_pause.rs b/tokio/tests/time_pause.rs index 49a7677f5c8..bc84ac578d0 100644 --- a/tokio/tests/time_pause.rs +++ b/tokio/tests/time_pause.rs @@ -1,6 +1,9 @@ #![warn(rust_2018_idioms)] #![cfg(feature = "full")] +use rand::SeedableRng; +use rand::{rngs::StdRng, Rng}; +use tokio::time::{self, Duration, Instant}; use tokio_test::assert_err; #[tokio::test] @@ -31,3 +34,26 @@ async fn pause_time_in_spawn_threads() { assert_err!(t.await); } + +#[test] +fn paused_time_is_deterministic() { + let run_1 = paused_time_stress_run(); + let run_2 = paused_time_stress_run(); + + assert_eq!(run_1, run_2); +} + +#[tokio::main(flavor = "current_thread", start_paused = true)] +async fn paused_time_stress_run() -> Vec { + let mut rng = StdRng::seed_from_u64(1); + + let mut times = vec![]; + let start = Instant::now(); + for _ in 0..10_000 { + let sleep = rng.gen_range(Duration::from_secs(0)..Duration::from_secs(1)); + time::sleep(sleep).await; + times.push(start.elapsed()); + } + + times +}