Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve timer functionality #87

Merged
merged 3 commits into from
Aug 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
161 changes: 117 additions & 44 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ fn duration_max() -> Duration {
Duration::new(std::u64::MAX, 1_000_000_000 - 1)
}

fn instant_max() -> Instant {
// In order to ensure this point in time is never reached, it
// is put 30 years into the future.
Instant::now() + Duration::from_secs(86400 * 365 * 30)
}

/// A future or stream that emits timed events.
///
/// Timers are futures that output a single [`Instant`] when they fire.
Expand Down Expand Up @@ -139,13 +145,57 @@ pub struct Timer {
id_and_waker: Option<(usize, Waker)>,

/// The next instant at which this timer fires.
when: Instant,
///
/// If this timer is a blank timer, this value is None. If the timer
/// must be set, this value contains the next instant at which the
/// timer must fire.
when: Option<Instant>,

/// The period.
period: Duration,
}

impl Timer {
/// Creates a timer that will never fire.
///
/// # Examples
///
/// This function may also be useful for creating a function with an optional timeout.
///
/// ```
/// # futures_lite::future::block_on(async {
/// use async_io::Timer;
/// use futures_lite::prelude::*;
/// use std::time::Duration;
///
/// async fn run_with_timeout(timeout: Option<Duration>) {
/// let timer = timeout
/// .map(|timeout| Timer::after(timeout))
/// .unwrap_or_else(Timer::never);
///
/// run_lengthy_operation().or(timer).await;
/// }
/// # // Note that since a Timer as a Future returns an Instant,
/// # // this function needs to return an Instant to be used
/// # // in "or".
/// # async fn run_lengthy_operation() -> std::time::Instant {
/// # std::time::Instant::now()
/// # }
///
/// // Times out after 5 seconds.
/// run_with_timeout(Some(Duration::from_secs(5))).await;
/// // Does not time out.
/// run_with_timeout(None).await;
/// # });
/// ```
pub fn never() -> Timer {
Timer {
id_and_waker: None,
when: None,
period: duration_max(),
}
}

/// Creates a timer that emits an event once after the given duration of time.
///
/// # Examples
Expand All @@ -159,7 +209,11 @@ impl Timer {
/// # });
/// ```
pub fn after(duration: Duration) -> Timer {
Timer::at(Instant::now() + duration)
Timer::at(
Instant::now()
.checked_add(duration)
.unwrap_or_else(instant_max),
)
}

/// Creates a timer that emits an event once at the given time instant.
Expand Down Expand Up @@ -196,7 +250,12 @@ impl Timer {
/// # });
/// ```
pub fn interval(period: Duration) -> Timer {
Timer::interval_at(Instant::now() + period, period)
Timer::interval_at(
Instant::now()
.checked_add(period)
.unwrap_or_else(instant_max),
period,
)
}

/// Creates a timer that emits events periodically, starting at `start`.
Expand All @@ -217,7 +276,7 @@ impl Timer {
pub fn interval_at(start: Instant, period: Duration) -> Timer {
Timer {
id_and_waker: None,
when: start,
when: Some(start),
period,
}
}
Expand All @@ -240,7 +299,11 @@ impl Timer {
/// # });
/// ```
pub fn set_after(&mut self, duration: Duration) {
self.set_at(Instant::now() + duration);
self.set_at(
Instant::now()
.checked_add(duration)
.unwrap_or_else(instant_max),
);
}

/// Sets the timer to emit an event once at the given time instant.
Expand All @@ -264,17 +327,17 @@ impl Timer {
/// # });
/// ```
pub fn set_at(&mut self, instant: Instant) {
if let Some((id, _)) = self.id_and_waker.as_ref() {
if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, *id);
Reactor::get().remove_timer(when, *id);
}

// Update the timeout.
self.when = instant;
self.when = Some(instant);

if let Some((id, waker)) = self.id_and_waker.as_mut() {
// Re-register the timer with the new timeout.
*id = Reactor::get().insert_timer(self.when, waker);
*id = Reactor::get().insert_timer(instant, waker);
}
}

Expand All @@ -299,7 +362,12 @@ impl Timer {
/// # });
/// ```
pub fn set_interval(&mut self, period: Duration) {
self.set_interval_at(Instant::now() + period, period);
self.set_interval_at(
Instant::now()
.checked_add(period)
.unwrap_or_else(instant_max),
period,
);
}

/// Sets the timer to emit events periodically, starting at `start`.
Expand All @@ -324,26 +392,26 @@ impl Timer {
/// # });
/// ```
pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
if let Some((id, _)) = self.id_and_waker.as_ref() {
if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, *id);
Reactor::get().remove_timer(when, *id);
}

self.when = start;
self.when = Some(start);
self.period = period;

if let Some((id, waker)) = self.id_and_waker.as_mut() {
// Re-register the timer with the new timeout.
*id = Reactor::get().insert_timer(self.when, waker);
*id = Reactor::get().insert_timer(start, waker);
}
}
}

impl Drop for Timer {
fn drop(&mut self) {
if let Some((id, _)) = self.id_and_waker.take() {
if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.take()) {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
Reactor::get().remove_timer(when, id);
}
}
}
Expand All @@ -363,39 +431,44 @@ impl Future for Timer {
impl Stream for Timer {
type Item = Instant;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Check if the timer has already fired.
if Instant::now() >= self.when {
if let Some((id, _)) = self.id_and_waker.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}
let when = self.when;
if let Some(next) = when.checked_add(self.period) {
self.when = next;
// Register the timer in the reactor.
let id = Reactor::get().insert_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
return Poll::Ready(Some(when));
} else {
match &self.id_and_waker {
None => {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

if let Some(ref mut when) = this.when {
// Check if the timer has already fired.
if Instant::now() >= *when {
if let Some((id, _)) = this.id_and_waker.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(*when, id);
}
let result_time = *when;
if let Some(next) = (*when).checked_add(this.period) {
*when = next;
// Register the timer in the reactor.
let id = Reactor::get().insert_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
let id = Reactor::get().insert_timer(next, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
// Deregister the timer from the reactor to remove the old waker.
Reactor::get().remove_timer(self.when, *id);

// Register the timer in the reactor with the new waker.
let id = Reactor::get().insert_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
return Poll::Ready(Some(result_time));
} else {
match &this.id_and_waker {
None => {
// Register the timer in the reactor.
let id = Reactor::get().insert_timer(*when, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
// Deregister the timer from the reactor to remove the old waker.
Reactor::get().remove_timer(*when, *id);

// Register the timer in the reactor with the new waker.
let id = Reactor::get().insert_timer(*when, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
Some(_) => {}
}
Some(_) => {}
}
}

Poll::Pending
}
}
Expand Down