From 98cd6d484671e213285fbcf76a78c0aabfb84469 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Thu, 10 Jun 2021 14:58:40 -0700 Subject: [PATCH 1/7] time: fix time::advance() with sub-ms durations Update the advance logic to factor in the timer's ms rounding. Fixes #3837 --- tokio/src/time/clock.rs | 12 +++++++++++- tokio/tests/time_pause.rs | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index c5ef86be4e0..865f8e30f19 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -124,7 +124,17 @@ cfg_test_util! { let until = clock.now() + duration; clock.advance(duration); - crate::time::sleep_until(until).await; + // Fixes tokio-rs/tokio#3837 + // + // "sleep_until" is needed to ensure registered timers are fired after the + // advance call, however, it will also round up time to the nearest ms. To + // avoid this loss of precision, we sleep until the previous millisecond. + // If the advance is less than 1ms, then we just need to yield. + if duration >= Duration::from_millis(1) { + crate::time::sleep_until(until - Duration::from_millis(1)).await; + } else { + crate::task::yield_now().await; + } } /// Return the current instant, factoring in frozen time. diff --git a/tokio/tests/time_pause.rs b/tokio/tests/time_pause.rs index d1834af2157..87310b635f0 100644 --- a/tokio/tests/time_pause.rs +++ b/tokio/tests/time_pause.rs @@ -215,6 +215,40 @@ async fn interval() { assert_pending!(poll_next(&mut i)); } +#[tokio::test] +async fn test_time_advance_sub_ms() { + time::pause(); + let now = Instant::now(); + + let dur = Duration::from_micros(51_592); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); + + let now = Instant::now(); + let dur = Duration::from_micros(1); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); +} + +#[tokio::test] +async fn test_time_advance_3ms_and_change() { + time::pause(); + let now = Instant::now(); + + let dur = Duration::from_micros(3_141_592); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); + + let now = Instant::now(); + let dur = Duration::from_micros(3_123_456); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); +} + fn poll_next(interval: &mut task::Spawn) -> Poll { interval.enter(|cx, mut interval| interval.poll_tick(cx)) } From 7b4e5b824d08eca237907380786a91e272e3030a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 11 Jun 2021 14:09:53 -0700 Subject: [PATCH 2/7] Apply suggestions from code review Co-authored-by: sb64 <53383020+sb64@users.noreply.github.com> --- tokio/tests/time_pause.rs | 40 +++++++++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/tokio/tests/time_pause.rs b/tokio/tests/time_pause.rs index 87310b635f0..47e080e7426 100644 --- a/tokio/tests/time_pause.rs +++ b/tokio/tests/time_pause.rs @@ -215,9 +215,8 @@ async fn interval() { assert_pending!(poll_next(&mut i)); } -#[tokio::test] +#[tokio::test(start_paused = true)] async fn test_time_advance_sub_ms() { - time::pause(); let now = Instant::now(); let dur = Duration::from_micros(51_592); @@ -232,9 +231,8 @@ async fn test_time_advance_sub_ms() { assert_eq!(now.elapsed(), dur); } -#[tokio::test] +#[tokio::test(start_paused = true)] async fn test_time_advance_3ms_and_change() { - time::pause(); let now = Instant::now(); let dur = Duration::from_micros(3_141_592); @@ -249,6 +247,40 @@ async fn test_time_advance_3ms_and_change() { assert_eq!(now.elapsed(), dur); } +#[tokio::test(start_paused = true)] +async fn regression_3710_with_submillis_advance() { + let start = Instant::now(); + + time::advance(Duration::from_millis(1)).await; + + let mut sleep = task::spawn(time::sleep_until(start + Duration::from_secs(60))); + + assert_pending!(sleep.poll()); + + let before = Instant::now(); + let dur = Duration::from_micros(51_592); + time::advance(dur).await; + assert_eq!(before.elapsed(), dur); + + assert_pending!(sleep.poll()); +} + +#[tokio::test(start_paused = true)] +async fn exact_1ms_advance() { + let now = Instant::now(); + + let dur = Duration::from_millis(1); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); + + let now = Instant::now(); + let dur = Duration::from_millis(1); + time::advance(dur).await; + + assert_eq!(now.elapsed(), dur); +} + fn poll_next(interval: &mut task::Spawn) -> Poll { interval.enter(|cx, mut interval| interval.poll_tick(cx)) } From b8bc6b9ae96d248ae9478fdc41151bdf96a909e0 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 14 Jun 2021 13:25:14 -0700 Subject: [PATCH 3/7] intercept wakes and don't advance time in that case --- tokio/src/time/clock.rs | 13 +------ tokio/src/time/driver/mod.rs | 67 +++++++++++++++++++++++++++++++++--- tokio/tests/time_pause.rs | 38 +++++++++++++++++++- 3 files changed, 100 insertions(+), 18 deletions(-) diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index 865f8e30f19..ce50bdc394e 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -121,20 +121,9 @@ cfg_test_util! { /// runtime. pub async fn advance(duration: Duration) { let clock = clock().expect("time cannot be frozen from outside the Tokio runtime"); - let until = clock.now() + duration; clock.advance(duration); - // Fixes tokio-rs/tokio#3837 - // - // "sleep_until" is needed to ensure registered timers are fired after the - // advance call, however, it will also round up time to the nearest ms. To - // avoid this loss of precision, we sleep until the previous millisecond. - // If the advance is less than 1ms, then we just need to yield. - if duration >= Duration::from_millis(1) { - crate::time::sleep_until(until - Duration::from_millis(1)).await; - } else { - crate::task::yield_now().await; - } + crate::task::yield_now().await; } /// Return the current instant, factoring in frozen time. diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 3eb100412fb..6e48c64da7b 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -91,6 +91,13 @@ pub(crate) struct Driver { /// Parker to delegate to park: P, + + // When test-util is enabled, this tracks if the driver was woken before + // entering park. While it may look racy, it only has any effect when the + // clock is paused and pausing the clock is restricted to a single-threaded + // runtime. + #[cfg(feature = "test-util")] + did_wake: Arc, } /// A structure which handles conversion from Instants to u64 timestamps. @@ -178,6 +185,8 @@ where time_source, handle: Handle::new(Arc::new(inner)), park, + #[cfg(feature = "test-util")] + did_wake: Arc::new(AtomicBool::new(false)), } } @@ -220,8 +229,14 @@ where if clock.is_paused() { self.park.park_timeout(Duration::from_secs(0))?; - // Simulate advancing time - clock.advance(duration); + // If the time driver was woken, then the park completed + // before the "duration" elapsed (usually caused by a + // yield in `Runtime::block_on`). In this case, we don't + // advance the clock. + if !self.did_wake() { + // Simulate advancing time + clock.advance(duration); + } } else { self.park.park_timeout(duration)?; } @@ -233,7 +248,10 @@ where if let Some(duration) = limit { if clock.is_paused() { self.park.park_timeout(Duration::from_secs(0))?; - clock.advance(duration); + + if !self.did_wake() { + clock.advance(duration); + } } else { self.park.park_timeout(duration)?; } @@ -248,6 +266,18 @@ where Ok(()) } + + cfg_test_util! { + fn did_wake(&self) -> bool { + self.did_wake.swap(false, Ordering::SeqCst) + } + } + + cfg_not_test_util! { + fn did_wake(&self) -> bool { + unreachable!() + } + } } impl Handle { @@ -387,11 +417,11 @@ impl

Park for Driver

where P: Park + 'static, { - type Unpark = P::Unpark; + type Unpark = TimerUnpark

; type Error = P::Error; fn unpark(&self) -> Self::Unpark { - self.park.unpark() + TimerUnpark::new(self) } fn park(&mut self) -> Result<(), Self::Error> { @@ -426,6 +456,33 @@ where } } +pub(crate) struct TimerUnpark { + inner: P::Unpark, + + #[cfg(feature = "test-util")] + did_wake: Arc, +} + +impl TimerUnpark

{ + fn new(driver: &Driver

) -> TimerUnpark

{ + TimerUnpark { + inner: driver.park.unpark(), + + #[cfg(feature = "test-util")] + did_wake: driver.did_wake.clone(), + } + } +} + +impl Unpark for TimerUnpark

{ + fn unpark(&self) { + self.inner.unpark(); + + #[cfg(feature = "test-util")] + self.did_wake.store(true, Ordering::SeqCst); + } +} + // ===== impl Inner ===== impl Inner { diff --git a/tokio/tests/time_pause.rs b/tokio/tests/time_pause.rs index 47e080e7426..02e050a2dc4 100644 --- a/tokio/tests/time_pause.rs +++ b/tokio/tests/time_pause.rs @@ -4,7 +4,7 @@ use rand::SeedableRng; use rand::{rngs::StdRng, Rng}; use tokio::time::{self, Duration, Instant, Sleep}; -use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready_eq, task}; +use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready, assert_ready_eq, task}; use std::{ future::Future, @@ -281,6 +281,42 @@ async fn exact_1ms_advance() { assert_eq!(now.elapsed(), dur); } +#[tokio::test(start_paused = true)] +async fn advance_once_with_timer() { + let mut sleep = task::spawn(time::sleep(Duration::from_millis(1))); + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(250)).await; + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(1500)).await; + + assert!(sleep.is_woken()); + assert_ready!(sleep.poll()); +} + +#[tokio::test(start_paused = true)] +async fn advance_multi_with_timer() { + // Round to the nearest ms + // time::sleep(Duration::from_millis(1)).await; + + let mut sleep = task::spawn(time::sleep(Duration::from_millis(1))); + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(250)).await; + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(250)).await; + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(250)).await; + assert_pending!(sleep.poll()); + + time::advance(Duration::from_micros(250)).await; + assert!(sleep.is_woken()); + assert_ready!(sleep.poll()); +} + fn poll_next(interval: &mut task::Spawn) -> Poll { interval.enter(|cx, mut interval| interval.poll_tick(cx)) } From e790b0ab8fb990b3e5f6387e8afbb5450c5ec7f2 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 14 Jun 2021 14:31:54 -0700 Subject: [PATCH 4/7] apply suggestions --- tokio/src/time/driver/mod.rs | 62 ++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 6e48c64da7b..481a76878d8 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -92,10 +92,12 @@ pub(crate) struct Driver { /// Parker to delegate to park: P, - // When test-util is enabled, this tracks if the driver was woken before - // entering park. While it may look racy, it only has any effect when the - // clock is paused and pausing the clock is restricted to a single-threaded - // runtime. + // When `true`, a call to `park_timeout` should immediately return and time + // should not advance. One reason for this to be `true` is if the task + // passed to `Runtime::block_on` called `task::yield_now()`. + // + // While it may look racy, it only has any effect when the clock is paused + // and pausing the clock is restricted to a single-threaded runtime. #[cfg(feature = "test-util")] did_wake: Arc, } @@ -201,8 +203,6 @@ where } fn park_internal(&mut self, limit: Option) -> Result<(), P::Error> { - let clock = &self.time_source.clock; - let mut lock = self.handle.get().state.lock(); assert!(!self.handle.is_shutdown()); @@ -226,35 +226,14 @@ where duration = std::cmp::min(limit, duration); } - if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - - // If the time driver was woken, then the park completed - // before the "duration" elapsed (usually caused by a - // yield in `Runtime::block_on`). In this case, we don't - // advance the clock. - if !self.did_wake() { - // Simulate advancing time - clock.advance(duration); - } - } else { - self.park.park_timeout(duration)?; - } + self.park_timeout(duration)?; } else { self.park.park_timeout(Duration::from_secs(0))?; } } None => { if let Some(duration) = limit { - if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; - - if !self.did_wake() { - clock.advance(duration); - } - } else { - self.park.park_timeout(duration)?; - } + self.park_timeout(duration)?; } else { self.park.park()?; } @@ -268,14 +247,35 @@ where } cfg_test_util! { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + let clock = &self.time_source.clock; + + if clock.is_paused() { + self.park.park_timeout(Duration::from_secs(0))?; + + // If the time driver was woken, then the park completed + // before the "duration" elapsed (usually caused by a + // yield in `Runtime::block_on`). In this case, we don't + // advance the clock. + if !self.did_wake() { + // Simulate advancing time + clock.advance(duration); + } + } else { + self.park.park_timeout(duration)?; + } + + Ok(()) + } + fn did_wake(&self) -> bool { self.did_wake.swap(false, Ordering::SeqCst) } } cfg_not_test_util! { - fn did_wake(&self) -> bool { - unreachable!() + fn park_timeout(&mut self, limit: Duration) -> Result<(), P::Error> { + self.park.park_timeout(duration) } } } From bbf5f5ef70b01db4a25d4bd94e9e57bdd15e3842 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 14 Jun 2021 14:33:19 -0700 Subject: [PATCH 5/7] more feedback --- tokio/src/time/driver/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 481a76878d8..9aa4bce13c5 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -476,10 +476,10 @@ impl TimerUnpark

{ impl Unpark for TimerUnpark

{ fn unpark(&self) { - self.inner.unpark(); - #[cfg(feature = "test-util")] self.did_wake.store(true, Ordering::SeqCst); + + self.inner.unpark(); } } From 5dd84bb263e52f06591d05f85a015d57564eafe0 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 14 Jun 2021 14:36:10 -0700 Subject: [PATCH 6/7] fix build --- tokio/src/time/driver/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index 9aa4bce13c5..37d2231c34f 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -274,7 +274,7 @@ where } cfg_not_test_util! { - fn park_timeout(&mut self, limit: Duration) -> Result<(), P::Error> { + fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { self.park.park_timeout(duration) } } From f94ebfa56b044bbb29c3854e60bb6be73f9bc283 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 14 Jun 2021 14:47:00 -0700 Subject: [PATCH 7/7] try to fix the build again --- tokio/src/time/clock.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/tokio/src/time/clock.rs b/tokio/src/time/clock.rs index ce50bdc394e..a0ff62139d3 100644 --- a/tokio/src/time/clock.rs +++ b/tokio/src/time/clock.rs @@ -7,7 +7,7 @@ //! configurable. cfg_not_test_util! { - use crate::time::{Duration, Instant}; + use crate::time::{Instant}; #[derive(Debug, Clone)] pub(crate) struct Clock {} @@ -24,14 +24,6 @@ cfg_not_test_util! { pub(crate) fn now(&self) -> Instant { now() } - - pub(crate) fn is_paused(&self) -> bool { - false - } - - pub(crate) fn advance(&self, _dur: Duration) { - unreachable!(); - } } }