diff --git a/tokio-macros/src/entry.rs b/tokio-macros/src/entry.rs index 19cb0b1cc17..98d4ecf4e86 100644 --- a/tokio-macros/src/entry.rs +++ b/tokio-macros/src/entry.rs @@ -28,6 +28,7 @@ impl RuntimeFlavor { struct FinalConfig { flavor: RuntimeFlavor, worker_threads: Option, + auto_advance: Option, start_paused: Option, crate_name: Option, } @@ -36,6 +37,7 @@ struct FinalConfig { const DEFAULT_ERROR_CONFIG: FinalConfig = FinalConfig { flavor: RuntimeFlavor::CurrentThread, worker_threads: None, + auto_advance: None, start_paused: None, crate_name: None, }; @@ -45,6 +47,7 @@ struct Configuration { default_flavor: RuntimeFlavor, flavor: Option, worker_threads: Option<(usize, Span)>, + auto_advance: Option<(bool, Span)>, start_paused: Option<(bool, Span)>, is_test: bool, crate_name: Option, @@ -60,6 +63,7 @@ impl Configuration { }, flavor: None, worker_threads: None, + auto_advance: None, start_paused: None, is_test, crate_name: None, @@ -98,6 +102,16 @@ impl Configuration { Ok(()) } + fn set_auto_advance(&mut self, auto_advance: syn::Lit, span: Span) -> Result<(), syn::Error> { + if self.auto_advance.is_some() { + return Err(syn::Error::new(span, "`auto_advance` set multiple times.")); + } + + let auto_advance = parse_bool(auto_advance, span, "auto_advance")?; + self.auto_advance = Some((auto_advance, span)); + Ok(()) + } + fn set_start_paused(&mut self, start_paused: syn::Lit, span: Span) -> Result<(), syn::Error> { if self.start_paused.is_some() { return Err(syn::Error::new(span, "`start_paused` set multiple times.")); @@ -151,6 +165,18 @@ impl Configuration { } }; + let auto_advance = match (flavor, self.auto_advance) { + (Threaded, Some((_, auto_advance_span))) => { + let msg = format!( + "The `auto_advance` option requires the `current_thread` runtime flavor. Use `#[{}(flavor = \"current_thread\")]`", + self.macro_name(), + ); + return Err(syn::Error::new(auto_advance_span, msg)); + } + (CurrentThread, Some((auto_advance, _))) => Some(auto_advance), + (_, None) => None, + }; + let start_paused = match (flavor, self.start_paused) { (Threaded, Some((_, start_paused_span))) => { let msg = format!( @@ -167,6 +193,7 @@ impl Configuration { crate_name: self.crate_name.clone(), flavor, worker_threads, + auto_advance, start_paused, }) } @@ -268,6 +295,12 @@ fn build_config( syn::spanned::Spanned::span(&namevalue.lit), )?; } + "auto_advance" => { + config.set_auto_advance( + namevalue.lit.clone(), + syn::spanned::Spanned::span(&namevalue.lit), + )?; + } "start_paused" => { config.set_start_paused( namevalue.lit.clone(), @@ -369,6 +402,9 @@ fn parse_knobs(mut input: syn::ItemFn, is_test: bool, config: FinalConfig) -> To if let Some(v) = config.worker_threads { rt = quote! { #rt.worker_threads(#v) }; } + if let Some(v) = config.auto_advance { + rt = quote! { #rt.auto_advance(#v) }; + } if let Some(v) = config.start_paused { rt = quote! { #rt.start_paused(#v) }; } diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 4a85d0ddfb2..8ce50590f58 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -1,6 +1,7 @@ use crate::runtime::handle::Handle; use crate::runtime::{blocking, driver, Callback, Runtime}; use crate::util::rand::{RngSeed, RngSeedGenerator}; +use crate::time::PauseSettings; use std::fmt; use std::io; @@ -48,6 +49,9 @@ pub struct Builder { /// Whether or not to enable the time driver enable_time: bool, + /// Whether or not clock should auto-advance when sleeping while time is paused. + auto_advance: bool, + /// Whether or not the clock should start paused. start_paused: bool, @@ -181,6 +185,7 @@ cfg_unstable! { pub(crate) type ThreadNameFn = std::sync::Arc String + Send + Sync + 'static>; +#[derive(Clone, Copy)] pub(crate) enum Kind { CurrentThread, #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] @@ -232,6 +237,14 @@ impl Builder { // Time defaults to "off" enable_time: false, + // By default time can be paused and will auto-advance, + // but only on the `CurrentThread` runtime + auto_advance: match &kind { + Kind::CurrentThread => true, + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Kind::MultiThread => false, + }, + // The clock starts not-paused start_paused: false, @@ -639,14 +652,18 @@ impl Builder { fn get_cfg(&self) -> driver::Cfg { driver::Cfg { - enable_pause_time: match self.kind { - Kind::CurrentThread => true, - #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - Kind::MultiThread => false, - }, enable_io: self.enable_io, enable_time: self.enable_time, - start_paused: self.start_paused, + time_pausing: PauseSettings { + enabled: self.enable_time + && match &self.kind { + Kind::CurrentThread => true, + #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] + Kind::MultiThread => false, + }, + auto_advance: self.auto_advance, + start_paused: self.start_paused, + }, } } @@ -966,6 +983,27 @@ cfg_time! { cfg_test_util! { impl Builder { + /// Controls if the runtime's clock auto-advance behavior when paused. + /// + /// Pausing time requires the current-thread runtime; construction of + /// the runtime will panic otherwise. + /// + /// # Examples + /// + /// ``` + /// use tokio::runtime; + /// + /// let rt = runtime::Builder::new_current_thread() + /// .enable_time() + /// .auto_advance(false) + /// .build() + /// .unwrap(); + /// ``` + pub fn auto_advance(&mut self, auto_advance: bool) -> &mut Self { + self.auto_advance = auto_advance; + self + } + /// Controls if the runtime's clock starts paused or advancing. /// /// Pausing time requires the current-thread runtime; construction of diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 8f9c5122b85..2006b2225b7 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -34,15 +34,19 @@ pub(crate) struct Handle { pub(crate) struct Cfg { pub(crate) enable_io: bool, pub(crate) enable_time: bool, - pub(crate) enable_pause_time: bool, - pub(crate) start_paused: bool, + pub(crate) time_pausing: PauseSettings, } impl Driver { pub(crate) fn new(cfg: Cfg) -> io::Result<(Self, Handle)> { let (io_stack, io_handle, signal_handle) = create_io_stack(cfg.enable_io)?; - let clock = create_clock(cfg.enable_pause_time, cfg.start_paused); + #[cfg(feature = "time")] + #[cfg_attr(docsrs, doc(cfg(feature = "time")))] + let clock = crate::time::Clock::new(cfg.time_pausing); + + #[cfg(not(feature = "time"))] + let clock = (); let (time_driver, time_handle) = create_time_driver(cfg.enable_time, io_stack, clock.clone()); @@ -280,11 +284,8 @@ cfg_time! { pub(crate) type Clock = crate::time::Clock; pub(crate) type TimeHandle = Option; - - fn create_clock(enable_pausing: bool, start_paused: bool) -> Clock { - crate::time::Clock::new(enable_pausing, start_paused) - } - + pub(crate) type PauseSettings = crate::time::PauseSettings; + fn create_time_driver( enable: bool, io_stack: IoStack, @@ -327,12 +328,9 @@ cfg_not_time! { type TimeDriver = IoStack; pub(crate) type Clock = (); + pub(crate) type PauseSettings = (); pub(crate) type TimeHandle = (); - fn create_clock(_enable_pausing: bool, _start_paused: bool) -> Clock { - () - } - fn create_time_driver( _enable: bool, io_stack: IoStack, diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 240f8f16e6d..4e57438f3c8 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -230,8 +230,10 @@ impl Driver { // yield in `Runtime::block_on`). In this case, we don't // advance the clock. if !handle.did_wake() { - // Simulate advancing time - clock.advance(duration); + // Simulate advancing time if enabled + if clock.auto_advance() { + clock.advance(duration); + } } } else { self.park.park_timeout(rt_handle, duration); diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 0343c4f4cf0..78b031b119d 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -17,7 +17,7 @@ cfg_not_test_util! { } impl Clock { - pub(crate) fn new(_enable_pausing: bool, _start_paused: bool) -> Clock { + pub(crate) fn new(_enable_pausing: PauseSettings, _start_paused: bool) -> Clock { Clock {} } @@ -27,6 +27,18 @@ cfg_not_test_util! { } } +/// Used to control the time pausing test feature. +#[derive(Debug, Clone, Copy)] +pub struct PauseSettings { + /// Controls whether the clock can be paused at all. + /// If `test-util` feature is not enabled, setting this will panic when used. + pub(crate) enabled: bool, + /// Controls whether the clock will auto-advance to next sleep deadline automatically. + pub(crate) auto_advance: bool, + /// Controls whether the clock starts paused. + pub(crate) start_paused: bool, +} + cfg_test_util! { use crate::time::{Duration, Instant}; use crate::loom::sync::{Arc, Mutex}; @@ -57,13 +69,14 @@ cfg_test_util! { #[derive(Debug)] struct Inner { - /// True if the ability to pause time is enabled. - enable_pausing: bool, + /// Control if time can ba paused, and whether auto-advance behavior is enabled. + pausing: PauseSettings, /// Instant to use as the clock's base instant. base: std::time::Instant, - /// Instant at which the clock was last unfrozen. + /// Instant at which the clock was last unfrozen, + /// or None if the clock is currently frozen unfrozen: Option, } @@ -98,10 +111,12 @@ cfg_test_util! { /// If time is paused and the runtime has no work to do, the clock is /// auto-advanced to the next pending timer. This means that [`Sleep`] or /// other timer-backed primitives can cause the runtime to advance the - /// current time when awaited. + /// current time when awaited. This behavior can be disabled using + /// [`set_auto_advance`]. /// /// [`Sleep`]: crate::time::Sleep /// [`advance`]: crate::time::advance + /// [`set_auto_advance`]: crate::time::set_auto_advance #[track_caller] pub fn pause() { let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); @@ -167,6 +182,28 @@ cfg_test_util! { crate::task::yield_now().await; } + /// Controls time auto-advance behavior. + /// + /// When time is paused, by default the runtime automatically advances + /// time to the next timer. See [`pause`](pause#auto-advance) for more details. + /// + /// This function allows enabling and disabling the auto-advance behavior. + /// When auto-advance feature is disabled, sleeping will wait indefinitely + /// until it's re-enabled, or the time is manually advanced using [`advance`]. + /// This means that if all tasks call `sleep` simultaneously, the program will + /// deadlock. + /// + /// # Panics + /// + /// If called from outside of the Tokio runtime. + /// + /// [`sleep`]: fn@crate::time::sleep + /// [`advance`]: crate::time::advance + pub fn set_auto_advance(enable: bool) { + let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); + clock.set_auto_advance(enable); + } + /// Returns the current instant, factoring in frozen time. pub(crate) fn now() -> Instant { if let Some(clock) = clock() { @@ -179,18 +216,18 @@ cfg_test_util! { impl Clock { /// Returns a new `Clock` instance that uses the current execution context's /// source of time. - pub(crate) fn new(enable_pausing: bool, start_paused: bool) -> Clock { + pub(crate) fn new(pausing: PauseSettings) -> Clock { let now = std::time::Instant::now(); let clock = Clock { inner: Arc::new(Mutex::new(Inner { - enable_pausing, + pausing, base: now, unfrozen: Some(now), })), }; - if start_paused { + if pausing.start_paused { clock.pause(); } @@ -201,7 +238,7 @@ cfg_test_util! { pub(crate) fn pause(&self) { let mut inner = self.inner.lock(); - if !inner.enable_pausing { + if !inner.pausing.enabled { drop(inner); // avoid poisoning the lock panic!("`time::pause()` requires the `current_thread` Tokio runtime. \ This is the default Runtime used by `#[tokio::test]."); @@ -228,6 +265,23 @@ cfg_test_util! { inner.base += duration; } + pub(crate) fn auto_advance(&self) -> bool { + let inner = self.inner.lock(); + inner.pausing.auto_advance + } + + pub(crate) fn set_auto_advance(&self, enable: bool) { + let mut inner = self.inner.lock(); + + if !inner.pausing.enabled { + drop(inner); // avoid poisoning the lock + panic!("`time::set_auto_advance()` requires the `current_thread` Tokio runtime. \ + This is the default Runtime used by `#[tokio::test]."); + } + + inner.pausing.auto_advance = enable; + } + pub(crate) fn now(&self) -> Instant { let inner = self.inner.lock(); diff --git a/tokio/src/time/mod.rs b/tokio/src/time/mod.rs index a1f27b839e9..312aa35e3bc 100644 --- a/tokio/src/time/mod.rs +++ b/tokio/src/time/mod.rs @@ -86,8 +86,9 @@ mod clock; pub(crate) use self::clock::Clock; +pub use self::clock::PauseSettings; #[cfg(feature = "test-util")] -pub use clock::{advance, pause, resume}; +pub use self::clock::{advance, pause, resume, set_auto_advance}; pub mod error; diff --git a/tokio/tests/time_no_auto_advance.rs b/tokio/tests/time_no_auto_advance.rs new file mode 100644 index 00000000000..6e14f6bec86 --- /dev/null +++ b/tokio/tests/time_no_auto_advance.rs @@ -0,0 +1,99 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::time::{Duration as StdDuration, Instant as StdInstant}; +use tokio::sync::{mpsc, oneshot}; +use tokio::time::{self, Duration, Instant}; + +#[tokio::test] +async fn pause_time_in_main() { + time::set_auto_advance(false); + time::pause(); +} + +#[tokio::test] +async fn pause_time_in_task() { + let t = tokio::spawn(async { + time::set_auto_advance(false); + time::pause(); + }); + + t.await.unwrap(); +} + +#[tokio::test(start_paused = true, auto_advance = false)] +async fn time_can_be_advanced_manually() { + let std_start = StdInstant::now(); + let tokio_start = Instant::now(); + let (done_tx, mut done_rx) = oneshot::channel(); + + let t = tokio::spawn(async { + time::sleep(Duration::from_millis(100)).await; + assert_eq!(done_tx.send(()), Ok(())); + }); + + // Simulated tokio time should not advance even real time does + assert_eq!(tokio_start.elapsed(), Duration::ZERO); + std::thread::sleep(StdDuration::from_millis(5)); + assert_eq!(tokio_start.elapsed(), Duration::ZERO); + assert_eq!(done_rx.try_recv(), Err(oneshot::error::TryRecvError::Empty)); + + // The sleep shouldn't expire yet, but simulated time should advance + time::advance(Duration::from_millis(50)).await; + assert_eq!(tokio_start.elapsed(), Duration::from_millis(50)); + assert_eq!(done_rx.try_recv(), Err(oneshot::error::TryRecvError::Empty)); + + // Advance simulated time until the sleep expires + time::advance(Duration::from_millis(100)).await; + tokio::task::yield_now().await; // Make sure the scheduler picks up the task + done_rx.try_recv().expect("Channel closed unexpectedly"); + + t.await.unwrap(); + + assert!(std_start.elapsed() < StdDuration::from_millis(100)); + assert_eq!(tokio_start.elapsed(), Duration::from_millis(150)); +} + +#[tokio::test(start_paused = true)] +async fn auto_advance_enable_disable_works() { + let (stop_tx, mut stop_rx) = oneshot::channel(); + let (tx, mut rx) = mpsc::channel(1); + + let t = tokio::spawn(async move { + loop { + time::sleep(Duration::from_millis(100)).await; + tx.send(()).await.expect("Send failed"); + + let done = stop_rx.try_recv(); + if done == Err(oneshot::error::TryRecvError::Empty) { + continue; + } + done.unwrap(); + break; + } + }); + + // When the time is not paused, we should get new events constantly + for _ in 0..10 { + rx.recv().await.expect("Recv failed"); + } + + // Disable auto-advance and empty the buffer + time::set_auto_advance(false); + let _ = rx.try_recv(); + + // Now we shouldn't be getting new events anymore + for _ in 0..10 { + tokio::task::yield_now().await; + assert_eq!(rx.try_recv(), Err(mpsc::error::TryRecvError::Empty)); + } + + // Enable auto-advance and make sure we start receiving events again + time::set_auto_advance(true); + for _ in 0..10 { + rx.recv().await.expect("Recv failed"); + } + + stop_tx.send(()).expect("Unable to send stop message"); + t.await.unwrap(); +}