Skip to content

Commit

Permalink
time: fix time::advance() with sub-ms durations (#3852)
Browse files Browse the repository at this point in the history
Instead of using sleep in time::advance, this fixes the root of the issue. When futures passed
to `Runtime::block_on` are woken, it bypassed all the machinery around advancing time. By
intercepting wakes in the time driver, we know when the block_on task is woken and skip
advancing time in that case.

Fixes #3837
  • Loading branch information
carllerche committed Jun 14, 2021
1 parent 1baea39 commit 18779aa
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 30 deletions.
13 changes: 2 additions & 11 deletions tokio/src/time/clock.rs
Expand Up @@ -7,7 +7,7 @@
//! configurable.

cfg_not_test_util! {
use crate::time::{Duration, Instant};
use crate::time::{Instant};

#[derive(Debug, Clone)]
pub(crate) struct Clock {}
Expand All @@ -24,14 +24,6 @@ cfg_not_test_util! {
pub(crate) fn now(&self) -> Instant {
now()
}

pub(crate) fn is_paused(&self) -> bool {
false
}

pub(crate) fn advance(&self, _dur: Duration) {
unreachable!();
}
}
}

Expand Down Expand Up @@ -121,10 +113,9 @@ cfg_test_util! {
/// runtime.
pub async fn advance(duration: Duration) {
let clock = clock().expect("time cannot be frozen from outside the Tokio runtime");
let until = clock.now() + duration;
clock.advance(duration);

crate::time::sleep_until(until).await;
crate::task::yield_now().await;
}

/// Return the current instant, factoring in frozen time.
Expand Down
93 changes: 75 additions & 18 deletions tokio/src/time/driver/mod.rs
Expand Up @@ -91,6 +91,15 @@ pub(crate) struct Driver<P: Park + 'static> {

/// Parker to delegate to
park: P,

// When `true`, a call to `park_timeout` should immediately return and time
// should not advance. One reason for this to be `true` is if the task
// passed to `Runtime::block_on` called `task::yield_now()`.
//
// While it may look racy, it only has any effect when the clock is paused
// and pausing the clock is restricted to a single-threaded runtime.
#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}

/// A structure which handles conversion from Instants to u64 timestamps.
Expand Down Expand Up @@ -178,6 +187,8 @@ where
time_source,
handle: Handle::new(Arc::new(inner)),
park,
#[cfg(feature = "test-util")]
did_wake: Arc::new(AtomicBool::new(false)),
}
}

Expand All @@ -192,8 +203,6 @@ where
}

fn park_internal(&mut self, limit: Option<Duration>) -> Result<(), P::Error> {
let clock = &self.time_source.clock;

let mut lock = self.handle.get().state.lock();

assert!(!self.handle.is_shutdown());
Expand All @@ -217,26 +226,14 @@ where
duration = std::cmp::min(limit, duration);
}

if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;

// Simulate advancing time
clock.advance(duration);
} else {
self.park.park_timeout(duration)?;
}
self.park_timeout(duration)?;
} else {
self.park.park_timeout(Duration::from_secs(0))?;
}
}
None => {
if let Some(duration) = limit {
if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;
clock.advance(duration);
} else {
self.park.park_timeout(duration)?;
}
self.park_timeout(duration)?;
} else {
self.park.park()?;
}
Expand All @@ -248,6 +245,39 @@ where

Ok(())
}

cfg_test_util! {
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
let clock = &self.time_source.clock;

if clock.is_paused() {
self.park.park_timeout(Duration::from_secs(0))?;

// If the time driver was woken, then the park completed
// before the "duration" elapsed (usually caused by a
// yield in `Runtime::block_on`). In this case, we don't
// advance the clock.
if !self.did_wake() {
// Simulate advancing time
clock.advance(duration);
}
} else {
self.park.park_timeout(duration)?;
}

Ok(())
}

fn did_wake(&self) -> bool {
self.did_wake.swap(false, Ordering::SeqCst)
}
}

cfg_not_test_util! {
fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> {
self.park.park_timeout(duration)
}
}
}

impl Handle {
Expand Down Expand Up @@ -387,11 +417,11 @@ impl<P> Park for Driver<P>
where
P: Park + 'static,
{
type Unpark = P::Unpark;
type Unpark = TimerUnpark<P>;
type Error = P::Error;

fn unpark(&self) -> Self::Unpark {
self.park.unpark()
TimerUnpark::new(self)
}

fn park(&mut self) -> Result<(), Self::Error> {
Expand Down Expand Up @@ -426,6 +456,33 @@ where
}
}

pub(crate) struct TimerUnpark<P: Park + 'static> {
inner: P::Unpark,

#[cfg(feature = "test-util")]
did_wake: Arc<AtomicBool>,
}

impl<P: Park + 'static> TimerUnpark<P> {
fn new(driver: &Driver<P>) -> TimerUnpark<P> {
TimerUnpark {
inner: driver.park.unpark(),

#[cfg(feature = "test-util")]
did_wake: driver.did_wake.clone(),
}
}
}

impl<P: Park + 'static> Unpark for TimerUnpark<P> {
fn unpark(&self) {
#[cfg(feature = "test-util")]
self.did_wake.store(true, Ordering::SeqCst);

self.inner.unpark();
}
}

// ===== impl Inner =====

impl Inner {
Expand Down
104 changes: 103 additions & 1 deletion tokio/tests/time_pause.rs
Expand Up @@ -4,7 +4,7 @@
use rand::SeedableRng;
use rand::{rngs::StdRng, Rng};
use tokio::time::{self, Duration, Instant, Sleep};
use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready_eq, task};
use tokio_test::{assert_elapsed, assert_err, assert_pending, assert_ready, assert_ready_eq, task};

use std::{
future::Future,
Expand Down Expand Up @@ -215,6 +215,108 @@ async fn interval() {
assert_pending!(poll_next(&mut i));
}

#[tokio::test(start_paused = true)]
async fn test_time_advance_sub_ms() {
let now = Instant::now();

let dur = Duration::from_micros(51_592);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);

let now = Instant::now();
let dur = Duration::from_micros(1);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);
}

#[tokio::test(start_paused = true)]
async fn test_time_advance_3ms_and_change() {
let now = Instant::now();

let dur = Duration::from_micros(3_141_592);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);

let now = Instant::now();
let dur = Duration::from_micros(3_123_456);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);
}

#[tokio::test(start_paused = true)]
async fn regression_3710_with_submillis_advance() {
let start = Instant::now();

time::advance(Duration::from_millis(1)).await;

let mut sleep = task::spawn(time::sleep_until(start + Duration::from_secs(60)));

assert_pending!(sleep.poll());

let before = Instant::now();
let dur = Duration::from_micros(51_592);
time::advance(dur).await;
assert_eq!(before.elapsed(), dur);

assert_pending!(sleep.poll());
}

#[tokio::test(start_paused = true)]
async fn exact_1ms_advance() {
let now = Instant::now();

let dur = Duration::from_millis(1);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);

let now = Instant::now();
let dur = Duration::from_millis(1);
time::advance(dur).await;

assert_eq!(now.elapsed(), dur);
}

#[tokio::test(start_paused = true)]
async fn advance_once_with_timer() {
let mut sleep = task::spawn(time::sleep(Duration::from_millis(1)));
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(1500)).await;

assert!(sleep.is_woken());
assert_ready!(sleep.poll());
}

#[tokio::test(start_paused = true)]
async fn advance_multi_with_timer() {
// Round to the nearest ms
// time::sleep(Duration::from_millis(1)).await;

let mut sleep = task::spawn(time::sleep(Duration::from_millis(1)));
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(250)).await;
assert_pending!(sleep.poll());

time::advance(Duration::from_micros(250)).await;
assert!(sleep.is_woken());
assert_ready!(sleep.poll());
}

fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
interval.enter(|cx, mut interval| interval.poll_tick(cx))
}
Expand Down

0 comments on commit 18779aa

Please sign in to comment.