Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve timer functionality #87

Merged
merged 3 commits into from
Aug 21, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
144 changes: 100 additions & 44 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ fn duration_max() -> Duration {
Duration::new(std::u64::MAX, 1_000_000_000 - 1)
}

fn instant_max() -> Instant {
// In order to ensure this point in time is never reached, it
// is put 30 years into the future.
Instant::now() + Duration::from_secs(86400 * 365 * 30)
}

/// A future or stream that emits timed events.
///
/// Timers are futures that output a single [`Instant`] when they fire.
Expand Down Expand Up @@ -139,13 +145,34 @@ pub struct Timer {
id_and_waker: Option<(usize, Waker)>,

/// The next instant at which this timer fires.
when: Instant,
///
/// If this timer is a blank timer, this value is None. If the timer
/// must be set, this value contains the next instant at which the
/// timer must fire.
when: Option<Instant>,

/// The period.
period: Duration,
}

impl Timer {
/// Creates a timer that will never fire.
///
/// # Examples
///
/// ```
/// use async_io::Timer;
///
/// let timer = Timer::blank();
/// ```
pub fn blank() -> Timer {
notgull marked this conversation as resolved.
Show resolved Hide resolved
Timer {
id_and_waker: None,
when: None,
period: duration_max(),
}
}

/// Creates a timer that emits an event once after the given duration of time.
///
/// # Examples
Expand All @@ -159,7 +186,11 @@ impl Timer {
/// # });
/// ```
pub fn after(duration: Duration) -> Timer {
Timer::at(Instant::now() + duration)
Timer::at(
Instant::now()
.checked_add(duration)
.unwrap_or_else(instant_max),
)
}

/// Creates a timer that emits an event once at the given time instant.
Expand Down Expand Up @@ -196,7 +227,12 @@ impl Timer {
/// # });
/// ```
pub fn interval(period: Duration) -> Timer {
Timer::interval_at(Instant::now() + period, period)
Timer::interval_at(
Instant::now()
.checked_add(period)
.unwrap_or_else(instant_max),
period,
)
}

/// Creates a timer that emits events periodically, starting at `start`.
Expand All @@ -217,7 +253,7 @@ impl Timer {
pub fn interval_at(start: Instant, period: Duration) -> Timer {
Timer {
id_and_waker: None,
when: start,
when: Some(start),
period,
}
}
Expand All @@ -240,7 +276,11 @@ impl Timer {
/// # });
/// ```
pub fn set_after(&mut self, duration: Duration) {
self.set_at(Instant::now() + duration);
self.set_at(
Instant::now()
.checked_add(duration)
.unwrap_or_else(instant_max),
);
}

/// Sets the timer to emit an event once at the given time instant.
Expand All @@ -264,17 +304,17 @@ impl Timer {
/// # });
/// ```
pub fn set_at(&mut self, instant: Instant) {
if let Some((id, _)) = self.id_and_waker.as_ref() {
if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, *id);
Reactor::get().remove_timer(when, *id);
}

// Update the timeout.
self.when = instant;
self.when = Some(instant);

if let Some((id, waker)) = self.id_and_waker.as_mut() {
// Re-register the timer with the new timeout.
*id = Reactor::get().insert_timer(self.when, waker);
*id = Reactor::get().insert_timer(instant, waker);
}
}

Expand All @@ -299,7 +339,12 @@ impl Timer {
/// # });
/// ```
pub fn set_interval(&mut self, period: Duration) {
self.set_interval_at(Instant::now() + period, period);
self.set_interval_at(
Instant::now()
.checked_add(period)
.unwrap_or_else(instant_max),
period,
);
}

/// Sets the timer to emit events periodically, starting at `start`.
Expand All @@ -324,26 +369,32 @@ impl Timer {
/// # });
/// ```
pub fn set_interval_at(&mut self, start: Instant, period: Duration) {
if let Some((id, _)) = self.id_and_waker.as_ref() {
if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.as_ref()) {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, *id);
Reactor::get().remove_timer(when, *id);
}

self.when = start;
self.when = Some(start);
self.period = period;

if let Some((id, waker)) = self.id_and_waker.as_mut() {
// Re-register the timer with the new timeout.
*id = Reactor::get().insert_timer(self.when, waker);
*id = Reactor::get().insert_timer(start, waker);
}
}
}

impl Default for Timer {
fn default() -> Self {
Timer::blank()
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no strong opinions, but I personally feel it is a bit odd that a timer that is never resolved is the default.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The three options I see are:

  • Leave the Timer without a Default impl.
  • Have the default impl never fire.
  • Have the default impl fire immediately.

Now that I think about it, the third options seems to be the least footgun-like.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I'm starting to think that the first option is better. There aren't many instances where the Default impl of a timer is called anyhow (unless you're storing them in a TinyVec), and the ones where they would be feel unavoidably like a footgun. I'll just not have a Default impl; we can discuss it later if we need to.

Copy link
Collaborator

@taiki-e taiki-e Aug 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it seems better to postpone the Default implementation until someone actually asks for it.


impl Drop for Timer {
fn drop(&mut self) {
if let Some((id, _)) = self.id_and_waker.take() {
if let (Some(when), Some((id, _))) = (self.when, self.id_and_waker.take()) {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
Reactor::get().remove_timer(when, id);
}
}
}
Expand All @@ -363,39 +414,44 @@ impl Future for Timer {
impl Stream for Timer {
type Item = Instant;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Check if the timer has already fired.
if Instant::now() >= self.when {
if let Some((id, _)) = self.id_and_waker.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}
let when = self.when;
if let Some(next) = when.checked_add(self.period) {
self.when = next;
// Register the timer in the reactor.
let id = Reactor::get().insert_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
}
return Poll::Ready(Some(when));
} else {
match &self.id_and_waker {
None => {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();

if let Some(ref mut when) = this.when {
// Check if the timer has already fired.
if Instant::now() >= *when {
if let Some((id, _)) = this.id_and_waker.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(*when, id);
}
let result_time = *when;
if let Some(next) = (*when).checked_add(this.period) {
*when = next;
// Register the timer in the reactor.
let id = Reactor::get().insert_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
let id = Reactor::get().insert_timer(next, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
// Deregister the timer from the reactor to remove the old waker.
Reactor::get().remove_timer(self.when, *id);

// Register the timer in the reactor with the new waker.
let id = Reactor::get().insert_timer(self.when, cx.waker());
self.id_and_waker = Some((id, cx.waker().clone()));
return Poll::Ready(Some(result_time));
} else {
match &this.id_and_waker {
None => {
// Register the timer in the reactor.
let id = Reactor::get().insert_timer(*when, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
Some((id, w)) if !w.will_wake(cx.waker()) => {
// Deregister the timer from the reactor to remove the old waker.
Reactor::get().remove_timer(*when, *id);

// Register the timer in the reactor with the new waker.
let id = Reactor::get().insert_timer(*when, cx.waker());
this.id_and_waker = Some((id, cx.waker().clone()));
}
Some(_) => {}
}
Some(_) => {}
}
}

Poll::Pending
}
}
Expand Down