Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

time: allow users to specify Interval behavior when delayed #3721

Merged
merged 18 commits into from Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
253 changes: 221 additions & 32 deletions tokio/src/time/interval.rs
@@ -1,17 +1,20 @@
use crate::future::poll_fn;
use crate::time::{sleep_until, Duration, Instant, Sleep};

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{convert::TryInto, future::Future};

/// Creates new `Interval` that yields with interval of `duration`. The first
/// tick completes immediately.
/// Creates new [`Interval`] that yields with interval of `period`. The first
/// tick completes immediately. The default [`MissedTickBehavior`] is
/// [`Burst`](MissedTickBehavior::Burst), but this can be configured
/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior).
///
/// An interval will tick indefinitely. At any time, the `Interval` value can be
/// dropped. This cancels the interval.
/// An interval will tick indefinitely. At any time, the [`Interval`] value can
/// be dropped. This cancels the interval.
///
/// This function is equivalent to `interval_at(Instant::now(), period)`.
/// This function is equivalent to
/// [`interval_at(Instant::now(), period)`](interval_at).
///
/// # Panics
///
Expand All @@ -26,20 +29,20 @@ use std::task::{Context, Poll};
/// async fn main() {
/// let mut interval = time::interval(Duration::from_millis(10));
///
/// interval.tick().await;
/// interval.tick().await;
/// interval.tick().await;
/// interval.tick().await; // ticks immediately
/// interval.tick().await; // ticks after 10ms
/// interval.tick().await; // ticks after 10ms
///
/// // approximately 20ms have elapsed.
/// }
/// ```
///
/// A simple example using `interval` to execute a task every two seconds.
///
/// The difference between `interval` and [`sleep`] is that an `interval`
/// measures the time since the last tick, which means that `.tick().await`
/// The difference between `interval` and [`sleep`] is that an [`Interval`]
/// measures the time since the last tick, which means that [`.tick().await`]
/// may wait for a shorter time than the duration specified for the interval
/// if some time has passed between calls to `.tick().await`.
/// if some time has passed between calls to [`.tick().await`].
///
/// If the tick in the example below was replaced with [`sleep`], the task
/// would only be executed once every three seconds, and not every two
Expand All @@ -64,17 +67,20 @@ use std::task::{Context, Poll};
/// ```
///
/// [`sleep`]: crate::time::sleep()
/// [`.tick().await`]: Interval::tick
pub fn interval(period: Duration) -> Interval {
assert!(period > Duration::new(0, 0), "`period` must be non-zero.");

interval_at(Instant::now(), period)
}

/// Creates new `Interval` that yields with interval of `period` with the
/// first tick completing at `start`.
/// Creates new [`Interval`] that yields with interval of `period` with the
/// first tick completing at `start`. The default [`MissedTickBehavior`] is
/// [`Burst`](MissedTickBehavior::Burst), but this can be configured
/// by calling [`set_missed_tick_behavior`](Interval::set_missed_tick_behavior).
///
/// An interval will tick indefinitely. At any time, the `Interval` value can be
/// dropped. This cancels the interval.
/// An interval will tick indefinitely. At any time, the [`Interval`] value can
/// be dropped. This cancels the interval.
///
/// # Panics
///
Expand All @@ -90,9 +96,9 @@ pub fn interval(period: Duration) -> Interval {
/// let start = Instant::now() + Duration::from_millis(50);
/// let mut interval = interval_at(start, Duration::from_millis(10));
///
/// interval.tick().await;
/// interval.tick().await;
/// interval.tick().await;
/// interval.tick().await; // ticks after 50ms
/// interval.tick().await; // ticks after 10ms
/// interval.tick().await; // ticks after 10ms
///
/// // approximately 70ms have elapsed.
/// }
Expand All @@ -103,26 +109,179 @@ pub fn interval_at(start: Instant, period: Duration) -> Interval {
Interval {
delay: Box::pin(sleep_until(start)),
period,
missed_tick_behavior: Default::default(),
}
}

/// Interval returned by [`interval`](interval) and [`interval_at`](interval_at).
/// Defines the behavior of an [`Interval`] when it misses a tick.
///
/// Sometimes, an [`Interval`]'s tick is missed. For example, consider the
/// following:
///
/// ```ignore
sb64 marked this conversation as resolved.
Show resolved Hide resolved
/// use tokio::time;
///
/// let mut interval = time::interval(time::Duration::from_secs(2));
/// for _ in 0..5 {
/// interval.tick().await;
/// // if this takes more than 2 seconds, a tick will be delayed
/// task_that_takes_one_to_three_seconds().await;
/// }
/// ```
///
/// Generally, a tick is missed if someone spends too much time without calling
/// [`tick()`](Interval::tick).
///
/// By default, when at tick is missed, [`Interval`] fires ticks as quickly as
/// it can until it is back to where it should be. However, this is not always
/// the desired behavior. `MissedTickBehavior` allows users to specify which
/// behavior they want [`Interval`] to exhibit. Each variant represents a
/// different strategy.
///
/// Note that because the executor cannot guarantee exect precision with timers,
sb64 marked this conversation as resolved.
Show resolved Hide resolved
/// these strategies will only apply when the error in time is greater than 5
/// milliseconds.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MissedTickBehavior {
/// Tick as fast as possible until caught up.
///
/// When this strategy is used, [`Interval`] schedules ticks "normally" (the
/// same as it would have if the ticks hadn't been delayed), which results
/// in ticks being fired as fast as it can until it is caught up in time to
/// where it should be.
///
/// This looks something like this:
/// ```text
/// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
/// Actual ticks: | work -----| delay | work | work | work -| work -----|
// Poll behavior: | | | | | | | |
// | | | | | | | |
// Ready(s) | | Ready(s + 2p) | | | |
// Pending | Ready(s + 3p) | | |
// Ready(s + p) Ready(s + 4p) | |
// Ready(s + 5p) |
// Ready(s + 6p)
// * Where `s` is the start time and `p` is the period
/// ```
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand this diagram.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the new commit help to simplify? The poll behavior stuff was to help me figure out how to implement it. I left it in there for documentation purposes, but it seems to add more confusion. I used the same type of diagram that @Lindenk used in #3574. The idea is that 'work' is how long we spend doing, well, actual work, the dashes are time we spend (asynchronously) waiting for the tick, and 'delay' is representative of a situation where an Interval would be delayed from ticking.

So, we would expect the ticks to proceed along like diagrammed in the first line ('Expected ticks:'). But if the Interval is delayed, the bottom line ('Actual ticks:') shows how the Interval would behave. In this specific case, notice that there is no delay (no dashes) in the firing of ticks after the long 'delay' tick. That's because the specific behavior this diagram represents doesn't wait for ticks until it is "caught up" in time to where it should be. In this case, after 1 normal tick, one excessively long tick, and 2 ticks done without any delay, the 5th tick starts late, but finishes its work before the next tick was supposed to fire, so it waits for a little bit before tick 6 fires.

///
/// This is the default behavior when [`Interval`] is created with
/// [`interval`] and [`interval_at`].
///
/// Note: this is the behavior that [`Interval`] exhibited in previous
/// versions of Tokio.
sb64 marked this conversation as resolved.
Show resolved Hide resolved
Burst,

/// Delay missed ticks to happen at multiples of `period` from when [`tick`]
/// was called.
///
/// When this strategy is used, [`Interval`] schedules all future ticks to
/// happen at a regular `period` from the point when [`tick`] was called.
///
/// This looks something like this:
/// ```text
/// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
/// Actual ticks: | work -----| delay | work -----| work -----| work -----|
// Poll behavior: | | | | | | | |
// | | | | | | | |
// Ready(s) | | Ready(s + 2p) | | | |
// Pending | Pending | | |
// Ready(s + p) Ready(s + 2p + d) | |
// Ready(s + 3p + d) |
// Ready(s + 4p + d)
// * Where `s` is the start time, `p` is the period, and `d` is the delay
/// ```
///
/// Note that as a result, the ticks are no longer guaranteed to happen at
/// a multiple of `period` from `delay`.
///
/// [`tick`]: Interval::tick
Delay,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand what this one does.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're new to this code, can you help me clarify the documentation so that it's not confusing? The gist of it is this:

use tokio::time::{interval, Duration};

let mut interval = interval(Duration::from_millis(50));

task_that_takes_more_than_50_millis().await;
// The `Interval` has missed a tick

// Since we have exceeded our timeout, this will resolve immediately
interval.tick().await;

// But this one, rather than also resolving immediately, as might happen
// with the `Burst` behavior (depending on how long the `Interval` was
// delayed), will not resolve until 50ms after the call to `tick` up above.
// That is, in `tick`, when we recognize that we missed a tick, we schedule
// the next tick to happen 50ms (or whatever the `period` is) from right
// then, not from when were were *supposed* to tick
interval.tick().await;

Does that make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand now.


/// Skip the missed ticks and tick on the next multiple of `period`.
///
/// When this strategy is used, [`Interval`] schedules the next tick for the
/// closest multiple of `period` from when the [`Interval`] first ticked.
///
/// This looks something like this:
/// ```text
/// Expected ticks: | 1 | 2 | 3 | 4 | 5 | 6 |
/// Actual ticks: | work -----| delay | work ---| work -----| work -----|
// Poll behavior: | | | | | | |
// | | | | | | |
// Ready(s) | | Ready(s + 2p) | | |
// Pending | Ready(s + 4p) | |
// Ready(s + p) Ready(s + 5p) |
// Ready(s + 6p)
// * Where `s` is the start time and `p` is the period
/// ```
///
/// Note that the ticks aren't guarenteed to be one `period` away from the
/// last tick, but they will be a multiple of `period` away.
Skip,
sb64 marked this conversation as resolved.
Show resolved Hide resolved
}

impl MissedTickBehavior {
/// Determine when the next tick should happen.
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
fn next_timeout(&self, timeout: Instant, now: Instant, period: Duration) -> Instant {
match self {
Self::Burst => timeout + period,
Self::Delay => now + period,
Self::Skip => {
now + period
- Duration::from_nanos(
((now - timeout).as_nanos() % period.as_nanos())
.try_into()
// This operation is practically guaranteed not to
// fail, as in order for it to fail, `period` would
// have to be longer than `now - timeout`, and both
// would have to be longer than 584 years.
//
// If it did fail, there's not a good way to pass
// the error along to the user, so we just panic.
.expect(
"too much time has elapsed since the interval was supposed to tick",
),
)
}
}
}
}

impl Default for MissedTickBehavior {
/// Returns [`MissedTickBehavior::Burst`].
///
/// For most usecases, the [`Burst`] strategy is what is desired.
/// Additionally, to preserve backwards compatibility, the [`Burst`]
/// strategy must be the default. For these reasons,
/// [`MissedTickBehavior::Burst`] is the default for [`MissedTickBehavior`].
/// See [`Burst`] for more details.
///
/// [`Burst`]: MissedTickBehavior::Burst
fn default() -> Self {
Self::Burst
}
}

/// Interval returned by [`interval`] and [`interval_at`]
///
/// This type allows you to wait on a sequence of instants with a certain
/// duration between each instant. Unlike calling [`sleep`](crate::time::sleep)
/// in a loop, this lets you count the time spent between the calls to `sleep`
/// as well.
/// duration between each instant. Unlike calling [`sleep`] in a loop, this lets
/// you count the time spent between the calls to [`sleep`] as well.
///
/// An `Interval` can be turned into a `Stream` with [`IntervalStream`].
///
/// [`IntervalStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.IntervalStream.html
/// [`IntervalStream`]: https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/struct.IntervalStream.html
/// [`sleep`]: crate::time::sleep
#[derive(Debug)]
pub struct Interval {
/// Future that completes the next time the `Interval` yields a value.
delay: Pin<Box<Sleep>>,

/// The duration between values yielded by `Interval`.
period: Duration,

/// The strategy `Interval` should use when a tick is missed.
missed_tick_behavior: MissedTickBehavior,
}

impl Interval {
Expand Down Expand Up @@ -159,22 +318,52 @@ impl Interval {
///
/// When this method returns `Poll::Pending`, the current task is scheduled
/// to receive a wakeup when the instant has elapsed. Note that on multiple
/// calls to `poll_tick`, only the `Waker` from the `Context` passed to the
/// most recent call is scheduled to receive a wakeup.
/// calls to `poll_tick`, only the [`Waker`](std::task::Waker) from the
/// [`Context`] passed to the most recent call is scheduled to receive a
/// wakeup.
pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<Instant> {
// Wait for the delay to be done
ready!(Pin::new(&mut self.delay).poll(cx));

// Get the `now` by looking at the `delay` deadline
let now = self.delay.deadline();
// Get the time at which the `delay` was supposed to complete
let timeout = self.delay.deadline();

let now = Instant::now();

// If a tick was not missed, and thus we are being called before the
// next tick is due, just schedule the next tick normally, one `period`
// after `timeout`
//
// However, if a tick took excessively long and we are now behind,
// schedule the next tick according to how the user specified with
// `MissedTickBehavior`
let next = if now > timeout + Duration::from_millis(5) {
self.missed_tick_behavior
.next_timeout(timeout, now, self.period)
} else {
timeout + self.period
};

// The next interval value is `duration` after the one that just
// yielded.
let next = now + self.period;
self.delay.as_mut().reset(next);

// Return the current instant
Poll::Ready(now)
Poll::Ready(timeout)
}

/// Returns the [`MissedTickBehavior`] strategy currently being used.
pub fn missed_tick_behavior(&self) -> MissedTickBehavior {
self.missed_tick_behavior
}

/// Sets the [`MissedTickBehavior`] strategy that should be used.
pub fn set_missed_tick_behavior(&mut self, behavior: MissedTickBehavior) {
self.missed_tick_behavior = behavior;
}

/// Resets the [`MissedTickBehavior`] strategy to the default, which is
/// [`Burst`](MissedTickBehavior::Burst).
pub fn reset_missed_tick_behavior(&mut self) {
self.missed_tick_behavior = Default::default();
}
sb64 marked this conversation as resolved.
Show resolved Hide resolved

/// Returns the period of the interval.
Expand Down
17 changes: 8 additions & 9 deletions tokio/src/time/mod.rs
Expand Up @@ -3,21 +3,21 @@
//! This module provides a number of types for executing code after a set period
//! of time.
//!
//! * `Sleep` is a future that does no work and completes at a specific `Instant`
//! * [`Sleep`] is a future that does no work and completes at a specific [`Instant`]
//! in time.
//!
//! * `Interval` is a stream yielding a value at a fixed period. It is
//! initialized with a `Duration` and repeatedly yields each time the duration
//! * [`Interval`] is a stream yielding a value at a fixed period. It is
//! initialized with a [`Duration`] and repeatedly yields each time the duration
//! elapses.
//!
//! * `Timeout`: Wraps a future or stream, setting an upper bound to the amount
//! * [`Timeout`]: Wraps a future or stream, setting an upper bound to the amount
//! of time it is allowed to execute. If the future or stream does not
//! complete in time, then it is canceled and an error is returned.
//!
//! These types are sufficient for handling a large number of scenarios
//! involving time.
//!
//! These types must be used from within the context of the `Runtime`.
//! These types must be used from within the context of the [`Runtime`](crate::runtime::Runtime).
//!
//! # Examples
//!
Expand Down Expand Up @@ -55,8 +55,8 @@
//! A simple example using [`interval`] to execute a task every two seconds.
//!
//! The difference between [`interval`] and [`sleep`] is that an [`interval`]
//! measures the time since the last tick, which means that `.tick().await`
//! may wait for a shorter time than the duration specified for the interval
//! measures the time since the last tick, which means that `.tick().await` may
//! wait for a shorter time than the duration specified for the interval
//! if some time has passed between calls to `.tick().await`.
//!
//! If the tick in the example below was replaced with [`sleep`], the task
Expand All @@ -81,7 +81,6 @@
//! }
//! ```
//!
//! [`sleep`]: crate::time::sleep()
//! [`interval`]: crate::time::interval()

mod clock;
Expand All @@ -100,7 +99,7 @@ mod instant;
pub use self::instant::Instant;

mod interval;
pub use interval::{interval, interval_at, Interval};
pub use interval::{interval, interval_at, Interval, MissedTickBehavior};

mod timeout;
#[doc(inline)]
Expand Down