Skip to content

Commit

Permalink
time: clean time driver (#2905)
Browse files Browse the repository at this point in the history
* remove unnecessary wheel::Poll

the timer wheel uses the `wheel::Poll` struct as input when
advancing the timer to the next time step.  the `Poll` struct
contains an instant representing the time step to advance to
and also contains an optional and mutable reference to an
`Expiration` struct.  from what I can tell, the latter field
is only used in the context of polling the wheel and does not
need to be exposed outside of that method.  without the
expiration field the `Poll` struct is nothing more than a
wrapper around the instant being polled.  this change removes
the `Poll` struct and updates integration points accordingly.

* remove Stack trait in favor of concrete Stack implementation

* remove timer Registration struct
  • Loading branch information
greenwoodcm committed Oct 6, 2020
1 parent 4cf45c0 commit fcdf934
Show file tree
Hide file tree
Showing 11 changed files with 319 additions and 409 deletions.
8 changes: 4 additions & 4 deletions tokio-util/src/time/delay_queue.rs
Expand Up @@ -141,7 +141,7 @@ pub struct DelayQueue<T> {
delay: Option<Delay>,

/// Wheel polling state
poll: wheel::Poll,
wheel_now: u64,

/// Instant at which the timer starts
start: Instant,
Expand Down Expand Up @@ -251,7 +251,7 @@ impl<T> DelayQueue<T> {
slab: Slab::with_capacity(capacity),
expired: Stack::default(),
delay: None,
poll: wheel::Poll::new(0),
wheel_now: 0,
start: Instant::now(),
}
}
Expand Down Expand Up @@ -733,11 +733,11 @@ impl<T> DelayQueue<T> {

let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down);

self.poll = wheel::Poll::new(now);
self.wheel_now = now;
}

// We poll the wheel to get the next value out before finding the next deadline.
let wheel_idx = self.wheel.poll(&mut self.poll, &mut self.slab);
let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab);

self.delay = self.next_deadline().map(sleep_until);

Expand Down
49 changes: 19 additions & 30 deletions tokio-util/src/time/wheel/mod.rs
Expand Up @@ -51,13 +51,6 @@ pub(crate) enum InsertError {
Invalid,
}

/// Poll expirations from the wheel
#[derive(Debug, Default)]
pub(crate) struct Poll {
now: u64,
expiration: Option<Expiration>,
}

impl<T> Wheel<T>
where
T: Stack,
Expand Down Expand Up @@ -136,19 +129,18 @@ where
self.next_expiration().map(|expiration| expiration.deadline)
}

pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option<T::Owned> {
/// Advances the timer up to the instant represented by `now`.
pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option<T::Owned> {
loop {
if poll.expiration.is_none() {
poll.expiration = self.next_expiration().and_then(|expiration| {
if expiration.deadline > poll.now {
None
} else {
Some(expiration)
}
});
}
let expiration = self.next_expiration().and_then(|expiration| {
if expiration.deadline > now {
None
} else {
Some(expiration)
}
});

match poll.expiration {
match expiration {
Some(ref expiration) => {
if let Some(item) = self.poll_expiration(expiration, store) {
return Some(item);
Expand All @@ -157,12 +149,14 @@ where
self.set_elapsed(expiration.deadline);
}
None => {
self.set_elapsed(poll.now);
// in this case the poll did not indicate an expiration
// _and_ we were not able to find a next expiration in
// the current list of timers. advance to the poll's
// current time and do nothing else.
self.set_elapsed(now);
return None;
}
}

poll.expiration = None;
}
}

Expand Down Expand Up @@ -197,6 +191,10 @@ where
res
}

/// iteratively find entries that are between the wheel's current
/// time and the expiration time. for each in that population either
/// return it for notification (in the case of the last level) or tier
/// it down to the next level (in all other cases).
pub(crate) fn poll_expiration(
&mut self,
expiration: &Expiration,
Expand Down Expand Up @@ -251,15 +249,6 @@ fn level_for(elapsed: u64, when: u64) -> usize {
significant / 6
}

impl Poll {
pub(crate) fn new(now: u64) -> Poll {
Poll {
now,
expiration: None,
}
}
}

#[cfg(all(test, not(loom)))]
mod test {
use super::*;
Expand Down
44 changes: 33 additions & 11 deletions tokio/src/time/delay.rs
@@ -1,8 +1,9 @@
use crate::time::driver::Registration;
use crate::time::{Duration, Instant};
use crate::time::driver::{Entry, Handle};
use crate::time::{Duration, Error, Instant};

use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Poll};

/// Waits until `deadline` is reached.
Expand All @@ -16,8 +17,7 @@ use std::task::{self, Poll};
/// Canceling a delay is done by dropping the returned future. No additional
/// cleanup work is required.
pub fn sleep_until(deadline: Instant) -> Delay {
let registration = Registration::new(deadline, Duration::from_millis(0));
Delay { registration }
Delay::new_timeout(deadline, Duration::from_millis(0))
}

/// Waits until `duration` has elapsed.
Expand Down Expand Up @@ -63,25 +63,27 @@ pub struct Delay {
/// The link between the `Delay` instance and the timer that drives it.
///
/// This also stores the `deadline` value.
registration: Registration,
entry: Arc<Entry>,
}

impl Delay {
pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay {
let registration = Registration::new(deadline, duration);
Delay { registration }
let handle = Handle::current();
let entry = Entry::new(&handle, deadline, duration);

Delay { entry }
}

/// Returns the instant at which the future will complete.
pub fn deadline(&self) -> Instant {
self.registration.deadline()
self.entry.time_ref().deadline
}

/// Returns `true` if the `Delay` has elapsed
///
/// A `Delay` is elapsed when the requested duration has elapsed.
pub fn is_elapsed(&self) -> bool {
self.registration.is_elapsed()
self.entry.is_elapsed()
}

/// Resets the `Delay` instance to a new deadline.
Expand All @@ -92,7 +94,21 @@ impl Delay {
/// This function can be called both before and after the future has
/// completed.
pub fn reset(&mut self, deadline: Instant) {
self.registration.reset(deadline);
unsafe {
self.entry.time_mut().deadline = deadline;
}

Entry::reset(&mut self.entry);
}

fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll<Result<(), Error>> {
// Keep track of task budget
let coop = ready!(crate::coop::poll_proceed(cx));

self.entry.poll_elapsed(cx).map(move |r| {
coop.made_progress();
r
})
}
}

Expand All @@ -109,9 +125,15 @@ impl Future for Delay {
// Both cases are extremely rare, and pretty accurately fit into
// "logic errors", so we just panic in this case. A user couldn't
// really do much better if we passed the error onwards.
match ready!(self.registration.poll_elapsed(cx)) {
match ready!(self.poll_elapsed(cx)) {
Ok(()) => Poll::Ready(()),
Err(e) => panic!("timer error: {}", e),
}
}
}

impl Drop for Delay {
fn drop(&mut self) {
Entry::cancel(&self.entry);
}
}
20 changes: 12 additions & 8 deletions tokio/src/time/driver/entry.rs
Expand Up @@ -83,15 +83,15 @@ pub(crate) struct Entry {
/// Next entry in the State's linked list.
///
/// This is only accessed by the timer
pub(super) next_stack: UnsafeCell<Option<Arc<Entry>>>,
pub(crate) next_stack: UnsafeCell<Option<Arc<Entry>>>,

/// Previous entry in the State's linked list.
///
/// This is only accessed by the timer and is used to unlink a canceled
/// entry.
///
/// This is a weak reference.
pub(super) prev_stack: UnsafeCell<*const Entry>,
pub(crate) prev_stack: UnsafeCell<*const Entry>,
}

/// Stores the info for `Delay`.
Expand All @@ -112,21 +112,21 @@ const ERROR: u64 = u64::MAX;
impl Entry {
pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc<Entry> {
let inner = handle.inner().unwrap();
let entry: Entry;

// Increment the number of active timeouts
if let Err(err) = inner.increment() {
entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
// Attempt to increment the number of active timeouts
let entry = if let Err(err) = inner.increment() {
let entry = Entry::new2(deadline, duration, Weak::new(), ERROR);
entry.error(err);
entry
} else {
let when = inner.normalize_deadline(deadline);
let state = if when <= inner.elapsed() {
ELAPSED
} else {
when
};
entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state);
}
Entry::new2(deadline, duration, Arc::downgrade(&inner), state)
};

let entry = Arc::new(entry);
if let Err(err) = inner.queue(&entry) {
Expand All @@ -147,6 +147,10 @@ impl Entry {
&mut *self.time.0.get()
}

pub(crate) fn when(&self) -> u64 {
self.when_internal().expect("invalid internal state")
}

/// The current entry state as known by the timer. This is not the value of
/// `state`, but lets the timer know how to converge its state to `state`.
pub(crate) fn when_internal(&self) -> Option<u64> {
Expand Down
29 changes: 12 additions & 17 deletions tokio/src/time/driver/mod.rs
Expand Up @@ -9,12 +9,6 @@ pub(super) use self::entry::Entry;
mod handle;
pub(crate) use self::handle::Handle;

mod registration;
pub(crate) use self::registration::Registration;

mod stack;
use self::stack::Stack;

use crate::loom::sync::atomic::{AtomicU64, AtomicUsize};
use crate::park::{Park, Unpark};
use crate::time::{wheel, Error};
Expand Down Expand Up @@ -73,7 +67,7 @@ use std::{cmp, fmt};
/// When the timer processes entries at level zero, it will notify all the
/// `Delay` instances as their deadlines have been reached. For all higher
/// levels, all entries will be redistributed across the wheel at the next level
/// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will
/// down. Eventually, as time progresses, entries with [`Delay`][delay] instances will
/// either be canceled (dropped) or their associated entries will reach level
/// zero and be notified.
///
Expand All @@ -87,7 +81,7 @@ pub(crate) struct Driver<T: Park> {
inner: Arc<Inner>,

/// Timer wheel
wheel: wheel::Wheel<Stack>,
wheel: wheel::Wheel,

/// Thread parker. The `Driver` park implementation delegates to this.
park: T,
Expand Down Expand Up @@ -163,9 +157,8 @@ where
self.clock.now() - self.inner.start,
crate::time::Round::Down,
);
let mut poll = wheel::Poll::new(now);

while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
while let Some(entry) = self.wheel.poll(now) {
let when = entry.when_internal().expect("invalid internal entry state");

// Fire the entry
Expand Down Expand Up @@ -193,7 +186,7 @@ where
self.clear_entry(&entry);
}
(None, Some(when)) => {
// Queue the entry
// Add the entry to the timer wheel
self.add_entry(entry, when);
}
(Some(_), Some(next)) => {
Expand All @@ -205,19 +198,17 @@ where
}

fn clear_entry(&mut self, entry: &Arc<Entry>) {
self.wheel.remove(entry, &mut ());
self.wheel.remove(entry);
entry.set_when_internal(None);
}

/// Fires the entry if it needs to, otherwise queue it to be processed later.
///
/// Returns `None` if the entry was fired.
fn add_entry(&mut self, entry: Arc<Entry>, when: u64) {
use crate::time::wheel::InsertError;

entry.set_when_internal(Some(when));

match self.wheel.insert(when, entry, &mut ()) {
match self.wheel.insert(when, entry) {
Ok(_) => {}
Err((entry, InsertError::Elapsed)) => {
// The entry's deadline has elapsed, so fire it and update the
Expand Down Expand Up @@ -320,9 +311,9 @@ where
self.inner.process.shutdown();

// Clear the wheel, using u64::MAX allows us to drain everything
let mut poll = wheel::Poll::new(u64::MAX);
let end_of_time = u64::MAX;

while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) {
while let Some(entry) = self.wheel.poll(end_of_time) {
entry.error(Error::shutdown());
}

Expand Down Expand Up @@ -387,6 +378,10 @@ impl Inner {
debug_assert!(prev <= MAX_TIMEOUTS);
}

/// add the entry to the "process queue". entries are not immediately
/// pushed into the timer wheel but are instead pushed into the
/// process queue and then moved from the process queue into the timer
/// wheel on next `process`
fn queue(&self, entry: &Arc<Entry>) -> Result<(), Error> {
if self.process.push(entry)? {
// The timer is notified so that it can process the timeout
Expand Down

0 comments on commit fcdf934

Please sign in to comment.