From fcdf9345bf19e9a1e1664f01713f9eba54da27c5 Mon Sep 17 00:00:00 2001 From: greenwoodcm Date: Tue, 6 Oct 2020 12:48:01 -0700 Subject: [PATCH] time: clean time driver (#2905) * remove unnecessary wheel::Poll the timer wheel uses the `wheel::Poll` struct as input when advancing the timer to the next time step. the `Poll` struct contains an instant representing the time step to advance to and also contains an optional and mutable reference to an `Expiration` struct. from what I can tell, the latter field is only used in the context of polling the wheel and does not need to be exposed outside of that method. without the expiration field the `Poll` struct is nothing more than a wrapper around the instant being polled. this change removes the `Poll` struct and updates integration points accordingly. * remove Stack trait in favor of concrete Stack implementation * remove timer Registration struct --- tokio-util/src/time/delay_queue.rs | 8 +- tokio-util/src/time/wheel/mod.rs | 49 +++----- tokio/src/time/delay.rs | 44 +++++-- tokio/src/time/driver/entry.rs | 20 +-- tokio/src/time/driver/mod.rs | 29 ++--- tokio/src/time/driver/registration.rs | 56 --------- tokio/src/time/driver/stack.rs | 121 ------------------ tokio/src/time/tests/mod.rs | 6 +- tokio/src/time/wheel/level.rs | 174 +++++++++++++------------- tokio/src/time/wheel/mod.rs | 101 +++++++-------- tokio/src/time/wheel/stack.rs | 120 +++++++++++++++--- 11 files changed, 319 insertions(+), 409 deletions(-) delete mode 100644 tokio/src/time/driver/registration.rs delete mode 100644 tokio/src/time/driver/stack.rs diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs index b23c24e6149..92c922b89d4 100644 --- a/tokio-util/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -141,7 +141,7 @@ pub struct DelayQueue { delay: Option, /// Wheel polling state - poll: wheel::Poll, + wheel_now: u64, /// Instant at which the timer starts start: Instant, @@ -251,7 +251,7 @@ impl DelayQueue { slab: Slab::with_capacity(capacity), expired: Stack::default(), delay: None, - poll: wheel::Poll::new(0), + wheel_now: 0, start: Instant::now(), } } @@ -733,11 +733,11 @@ impl DelayQueue { let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down); - self.poll = wheel::Poll::new(now); + self.wheel_now = now; } // We poll the wheel to get the next value out before finding the next deadline. - let wheel_idx = self.wheel.poll(&mut self.poll, &mut self.slab); + let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab); self.delay = self.next_deadline().map(sleep_until); diff --git a/tokio-util/src/time/wheel/mod.rs b/tokio-util/src/time/wheel/mod.rs index a2ef27fc6c5..478037a3098 100644 --- a/tokio-util/src/time/wheel/mod.rs +++ b/tokio-util/src/time/wheel/mod.rs @@ -51,13 +51,6 @@ pub(crate) enum InsertError { Invalid, } -/// Poll expirations from the wheel -#[derive(Debug, Default)] -pub(crate) struct Poll { - now: u64, - expiration: Option, -} - impl Wheel where T: Stack, @@ -136,19 +129,18 @@ where self.next_expiration().map(|expiration| expiration.deadline) } - pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option { + /// Advances the timer up to the instant represented by `now`. + pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option { loop { - if poll.expiration.is_none() { - poll.expiration = self.next_expiration().and_then(|expiration| { - if expiration.deadline > poll.now { - None - } else { - Some(expiration) - } - }); - } + let expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > now { + None + } else { + Some(expiration) + } + }); - match poll.expiration { + match expiration { Some(ref expiration) => { if let Some(item) = self.poll_expiration(expiration, store) { return Some(item); @@ -157,12 +149,14 @@ where self.set_elapsed(expiration.deadline); } None => { - self.set_elapsed(poll.now); + // in this case the poll did not indicate an expiration + // _and_ we were not able to find a next expiration in + // the current list of timers. advance to the poll's + // current time and do nothing else. + self.set_elapsed(now); return None; } } - - poll.expiration = None; } } @@ -197,6 +191,10 @@ where res } + /// iteratively find entries that are between the wheel's current + /// time and the expiration time. for each in that population either + /// return it for notification (in the case of the last level) or tier + /// it down to the next level (in all other cases). pub(crate) fn poll_expiration( &mut self, expiration: &Expiration, @@ -251,15 +249,6 @@ fn level_for(elapsed: u64, when: u64) -> usize { significant / 6 } -impl Poll { - pub(crate) fn new(now: u64) -> Poll { - Poll { - now, - expiration: None, - } - } -} - #[cfg(all(test, not(loom)))] mod test { use super::*; diff --git a/tokio/src/time/delay.rs b/tokio/src/time/delay.rs index 42ae4b08431..9364860d806 100644 --- a/tokio/src/time/delay.rs +++ b/tokio/src/time/delay.rs @@ -1,8 +1,9 @@ -use crate::time::driver::Registration; -use crate::time::{Duration, Instant}; +use crate::time::driver::{Entry, Handle}; +use crate::time::{Duration, Error, Instant}; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::{self, Poll}; /// Waits until `deadline` is reached. @@ -16,8 +17,7 @@ use std::task::{self, Poll}; /// Canceling a delay is done by dropping the returned future. No additional /// cleanup work is required. pub fn sleep_until(deadline: Instant) -> Delay { - let registration = Registration::new(deadline, Duration::from_millis(0)); - Delay { registration } + Delay::new_timeout(deadline, Duration::from_millis(0)) } /// Waits until `duration` has elapsed. @@ -63,25 +63,27 @@ pub struct Delay { /// The link between the `Delay` instance and the timer that drives it. /// /// This also stores the `deadline` value. - registration: Registration, + entry: Arc, } impl Delay { pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay { - let registration = Registration::new(deadline, duration); - Delay { registration } + let handle = Handle::current(); + let entry = Entry::new(&handle, deadline, duration); + + Delay { entry } } /// Returns the instant at which the future will complete. pub fn deadline(&self) -> Instant { - self.registration.deadline() + self.entry.time_ref().deadline } /// Returns `true` if the `Delay` has elapsed /// /// A `Delay` is elapsed when the requested duration has elapsed. pub fn is_elapsed(&self) -> bool { - self.registration.is_elapsed() + self.entry.is_elapsed() } /// Resets the `Delay` instance to a new deadline. @@ -92,7 +94,21 @@ impl Delay { /// This function can be called both before and after the future has /// completed. pub fn reset(&mut self, deadline: Instant) { - self.registration.reset(deadline); + unsafe { + self.entry.time_mut().deadline = deadline; + } + + Entry::reset(&mut self.entry); + } + + fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll> { + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + + self.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }) } } @@ -109,9 +125,15 @@ impl Future for Delay { // Both cases are extremely rare, and pretty accurately fit into // "logic errors", so we just panic in this case. A user couldn't // really do much better if we passed the error onwards. - match ready!(self.registration.poll_elapsed(cx)) { + match ready!(self.poll_elapsed(cx)) { Ok(()) => Poll::Ready(()), Err(e) => panic!("timer error: {}", e), } } } + +impl Drop for Delay { + fn drop(&mut self) { + Entry::cancel(&self.entry); + } +} diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index 974465c19be..20f8e1c6dce 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -83,7 +83,7 @@ pub(crate) struct Entry { /// Next entry in the State's linked list. /// /// This is only accessed by the timer - pub(super) next_stack: UnsafeCell>>, + pub(crate) next_stack: UnsafeCell>>, /// Previous entry in the State's linked list. /// @@ -91,7 +91,7 @@ pub(crate) struct Entry { /// entry. /// /// This is a weak reference. - pub(super) prev_stack: UnsafeCell<*const Entry>, + pub(crate) prev_stack: UnsafeCell<*const Entry>, } /// Stores the info for `Delay`. @@ -112,12 +112,12 @@ const ERROR: u64 = u64::MAX; impl Entry { pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc { let inner = handle.inner().unwrap(); - let entry: Entry; - // Increment the number of active timeouts - if let Err(err) = inner.increment() { - entry = Entry::new2(deadline, duration, Weak::new(), ERROR); + // Attempt to increment the number of active timeouts + let entry = if let Err(err) = inner.increment() { + let entry = Entry::new2(deadline, duration, Weak::new(), ERROR); entry.error(err); + entry } else { let when = inner.normalize_deadline(deadline); let state = if when <= inner.elapsed() { @@ -125,8 +125,8 @@ impl Entry { } else { when }; - entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state); - } + Entry::new2(deadline, duration, Arc::downgrade(&inner), state) + }; let entry = Arc::new(entry); if let Err(err) = inner.queue(&entry) { @@ -147,6 +147,10 @@ impl Entry { &mut *self.time.0.get() } + pub(crate) fn when(&self) -> u64 { + self.when_internal().expect("invalid internal state") + } + /// The current entry state as known by the timer. This is not the value of /// `state`, but lets the timer know how to converge its state to `state`. pub(crate) fn when_internal(&self) -> Option { diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index bb6c28b34de..5ece7c72d74 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -9,12 +9,6 @@ pub(super) use self::entry::Entry; mod handle; pub(crate) use self::handle::Handle; -mod registration; -pub(crate) use self::registration::Registration; - -mod stack; -use self::stack::Stack; - use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; use crate::park::{Park, Unpark}; use crate::time::{wheel, Error}; @@ -73,7 +67,7 @@ use std::{cmp, fmt}; /// When the timer processes entries at level zero, it will notify all the /// `Delay` instances as their deadlines have been reached. For all higher /// levels, all entries will be redistributed across the wheel at the next level -/// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will +/// down. Eventually, as time progresses, entries with [`Delay`][delay] instances will /// either be canceled (dropped) or their associated entries will reach level /// zero and be notified. /// @@ -87,7 +81,7 @@ pub(crate) struct Driver { inner: Arc, /// Timer wheel - wheel: wheel::Wheel, + wheel: wheel::Wheel, /// Thread parker. The `Driver` park implementation delegates to this. park: T, @@ -163,9 +157,8 @@ where self.clock.now() - self.inner.start, crate::time::Round::Down, ); - let mut poll = wheel::Poll::new(now); - while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { + while let Some(entry) = self.wheel.poll(now) { let when = entry.when_internal().expect("invalid internal entry state"); // Fire the entry @@ -193,7 +186,7 @@ where self.clear_entry(&entry); } (None, Some(when)) => { - // Queue the entry + // Add the entry to the timer wheel self.add_entry(entry, when); } (Some(_), Some(next)) => { @@ -205,19 +198,17 @@ where } fn clear_entry(&mut self, entry: &Arc) { - self.wheel.remove(entry, &mut ()); + self.wheel.remove(entry); entry.set_when_internal(None); } /// Fires the entry if it needs to, otherwise queue it to be processed later. - /// - /// Returns `None` if the entry was fired. fn add_entry(&mut self, entry: Arc, when: u64) { use crate::time::wheel::InsertError; entry.set_when_internal(Some(when)); - match self.wheel.insert(when, entry, &mut ()) { + match self.wheel.insert(when, entry) { Ok(_) => {} Err((entry, InsertError::Elapsed)) => { // The entry's deadline has elapsed, so fire it and update the @@ -320,9 +311,9 @@ where self.inner.process.shutdown(); // Clear the wheel, using u64::MAX allows us to drain everything - let mut poll = wheel::Poll::new(u64::MAX); + let end_of_time = u64::MAX; - while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { + while let Some(entry) = self.wheel.poll(end_of_time) { entry.error(Error::shutdown()); } @@ -387,6 +378,10 @@ impl Inner { debug_assert!(prev <= MAX_TIMEOUTS); } + /// add the entry to the "process queue". entries are not immediately + /// pushed into the timer wheel but are instead pushed into the + /// process queue and then moved from the process queue into the timer + /// wheel on next `process` fn queue(&self, entry: &Arc) -> Result<(), Error> { if self.process.push(entry)? { // The timer is notified so that it can process the timeout diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs deleted file mode 100644 index 3a0b34501b0..00000000000 --- a/tokio/src/time/driver/registration.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::time::driver::{Entry, Handle}; -use crate::time::{Duration, Error, Instant}; - -use std::sync::Arc; -use std::task::{self, Poll}; - -/// Registration with a timer. -/// -/// The association between a `Delay` instance and a timer is done lazily in -/// `poll` -#[derive(Debug)] -pub(crate) struct Registration { - entry: Arc, -} - -impl Registration { - pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration { - let handle = Handle::current(); - - Registration { - entry: Entry::new(&handle, deadline, duration), - } - } - - pub(crate) fn deadline(&self) -> Instant { - self.entry.time_ref().deadline - } - - pub(crate) fn reset(&mut self, deadline: Instant) { - unsafe { - self.entry.time_mut().deadline = deadline; - } - - Entry::reset(&mut self.entry); - } - - pub(crate) fn is_elapsed(&self) -> bool { - self.entry.is_elapsed() - } - - pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - self.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) - } -} - -impl Drop for Registration { - fn drop(&mut self) { - Entry::cancel(&self.entry); - } -} diff --git a/tokio/src/time/driver/stack.rs b/tokio/src/time/driver/stack.rs deleted file mode 100644 index 3e2924f2653..00000000000 --- a/tokio/src/time/driver/stack.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::time::driver::Entry; -use crate::time::wheel; - -use std::ptr; -use std::sync::Arc; - -/// A doubly linked stack -#[derive(Debug)] -pub(crate) struct Stack { - head: Option>, -} - -impl Default for Stack { - fn default() -> Stack { - Stack { head: None } - } -} - -impl wheel::Stack for Stack { - type Owned = Arc; - type Borrowed = Entry; - type Store = (); - - fn is_empty(&self) -> bool { - self.head.is_none() - } - - fn push(&mut self, entry: Self::Owned, _: &mut Self::Store) { - // Get a pointer to the entry to for the prev link - let ptr: *const Entry = &*entry as *const _; - - // Remove the old head entry - let old = self.head.take(); - - unsafe { - // Ensure the entry is not already in a stack. - debug_assert!((*entry.next_stack.get()).is_none()); - debug_assert!((*entry.prev_stack.get()).is_null()); - - if let Some(ref entry) = old.as_ref() { - debug_assert!({ - // The head is not already set to the entry - ptr != &***entry as *const _ - }); - - // Set the previous link on the old head - *entry.prev_stack.get() = ptr; - } - - // Set this entry's next pointer - *entry.next_stack.get() = old; - } - - // Update the head pointer - self.head = Some(entry); - } - - /// Pops an item from the stack - fn pop(&mut self, _: &mut ()) -> Option> { - let entry = self.head.take(); - - unsafe { - if let Some(entry) = entry.as_ref() { - self.head = (*entry.next_stack.get()).take(); - - if let Some(entry) = self.head.as_ref() { - *entry.prev_stack.get() = ptr::null(); - } - - *entry.prev_stack.get() = ptr::null(); - } - } - - entry - } - - fn remove(&mut self, entry: &Entry, _: &mut ()) { - unsafe { - // Ensure that the entry is in fact contained by the stack - debug_assert!({ - // This walks the full linked list even if an entry is found. - let mut next = self.head.as_ref(); - let mut contains = false; - - while let Some(n) = next { - if entry as *const _ == &**n as *const _ { - debug_assert!(!contains); - contains = true; - } - - next = (*n.next_stack.get()).as_ref(); - } - - contains - }); - - // Unlink `entry` from the next node - let next = (*entry.next_stack.get()).take(); - - if let Some(next) = next.as_ref() { - (*next.prev_stack.get()) = *entry.prev_stack.get(); - } - - // Unlink `entry` from the prev node - - if let Some(prev) = (*entry.prev_stack.get()).as_ref() { - *prev.next_stack.get() = next; - } else { - // It is the head - self.head = next; - } - - // Unset the prev pointer - *entry.prev_stack.get() = ptr::null(); - } - } - - fn when(item: &Entry, _: &()) -> u64 { - item.when_internal().expect("invalid internal state") - } -} diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs index e112b8e1dc3..a043d65e19d 100644 --- a/tokio/src/time/tests/mod.rs +++ b/tokio/src/time/tests/mod.rs @@ -8,10 +8,10 @@ fn assert_sync() {} #[test] fn registration_is_send_and_sync() { - use crate::time::driver::Registration; + use crate::time::delay::Delay; - assert_send::(); - assert_sync::(); + assert_send::(); + assert_sync::(); } #[test] diff --git a/tokio/src/time/wheel/level.rs b/tokio/src/time/wheel/level.rs index 49f9bfb9cf0..d51d26a0362 100644 --- a/tokio/src/time/wheel/level.rs +++ b/tokio/src/time/wheel/level.rs @@ -1,9 +1,10 @@ +use super::{Item, OwnedItem}; use crate::time::wheel::Stack; use std::fmt; /// Wheel for a single level in the timer. This wheel contains 64 slots. -pub(crate) struct Level { +pub(crate) struct Level { level: usize, /// Bit field tracking which slots currently contain entries. @@ -16,7 +17,7 @@ pub(crate) struct Level { occupied: u64, /// Slots - slot: [T; LEVEL_MULT], + slot: [Stack; LEVEL_MULT], } /// Indicates when a slot must be processed next. @@ -37,87 +38,90 @@ pub(crate) struct Expiration { /// Being a power of 2 is very important. const LEVEL_MULT: usize = 64; -impl Level { - pub(crate) fn new(level: usize) -> Level { - // Rust's derived implementations for arrays require that the value - // contained by the array be `Copy`. So, here we have to manually - // initialize every single slot. - macro_rules! s { - () => { - T::default() - }; - }; +impl Level { + pub(crate) fn new(level: usize) -> Level { + // A value has to be Copy in order to use syntax like: + // let stack = Stack::default(); + // ... + // slots: [stack; 64], + // + // Alternatively, since Stack is Default one can + // use syntax like: + // let slots: [Stack; 64] = Default::default(); + // + // However, that is only supported for arrays of size + // 32 or fewer. So in our case we have to explicitly + // invoke the constructor for each array element. + let ctor = Stack::default; Level { level, occupied: 0, slot: [ - // It does not look like the necessary traits are - // derived for [T; 64]. - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), ], } } @@ -173,17 +177,17 @@ impl Level { Some(slot) } - pub(crate) fn add_entry(&mut self, when: u64, item: T::Owned, store: &mut T::Store) { + pub(crate) fn add_entry(&mut self, when: u64, item: OwnedItem) { let slot = slot_for(when, self.level); - self.slot[slot].push(item, store); + self.slot[slot].push(item); self.occupied |= occupied_bit(slot); } - pub(crate) fn remove_entry(&mut self, when: u64, item: &T::Borrowed, store: &mut T::Store) { + pub(crate) fn remove_entry(&mut self, when: u64, item: &Item) { let slot = slot_for(when, self.level); - self.slot[slot].remove(item, store); + self.slot[slot].remove(item); if self.slot[slot].is_empty() { // The bit is currently set @@ -194,8 +198,8 @@ impl Level { } } - pub(crate) fn pop_entry_slot(&mut self, slot: usize, store: &mut T::Store) -> Option { - let ret = self.slot[slot].pop(store); + pub(crate) fn pop_entry_slot(&mut self, slot: usize) -> Option { + let ret = self.slot[slot].pop(); if ret.is_some() && self.slot[slot].is_empty() { // The bit is currently set @@ -208,7 +212,7 @@ impl Level { } } -impl fmt::Debug for Level { +impl fmt::Debug for Level { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Level") .field("occupied", &self.occupied) diff --git a/tokio/src/time/wheel/mod.rs b/tokio/src/time/wheel/mod.rs index a2ef27fc6c5..03861240ac1 100644 --- a/tokio/src/time/wheel/mod.rs +++ b/tokio/src/time/wheel/mod.rs @@ -1,3 +1,5 @@ +use crate::time::driver::Entry; + mod level; pub(crate) use self::level::Expiration; use self::level::Level; @@ -5,9 +7,12 @@ use self::level::Level; mod stack; pub(crate) use self::stack::Stack; -use std::borrow::Borrow; +use std::sync::Arc; use std::usize; +pub(super) type Item = Entry; +pub(super) type OwnedItem = Arc; + /// Timing wheel implementation. /// /// This type provides the hashed timing wheel implementation that backs `Timer` @@ -20,7 +25,7 @@ use std::usize; /// /// See `Timer` documentation for some implementation notes. #[derive(Debug)] -pub(crate) struct Wheel { +pub(crate) struct Wheel { /// The number of milliseconds elapsed since the wheel started. elapsed: u64, @@ -34,7 +39,7 @@ pub(crate) struct Wheel { /// * ~ 4 min slots / ~ 4 hr range /// * ~ 4 hr slots / ~ 12 day range /// * ~ 12 day slots / ~ 2 yr range - levels: Vec>, + levels: Vec, } /// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots @@ -51,19 +56,9 @@ pub(crate) enum InsertError { Invalid, } -/// Poll expirations from the wheel -#[derive(Debug, Default)] -pub(crate) struct Poll { - now: u64, - expiration: Option, -} - -impl Wheel -where - T: Stack, -{ +impl Wheel { /// Create a new timing wheel - pub(crate) fn new() -> Wheel { + pub(crate) fn new() -> Wheel { let levels = (0..NUM_LEVELS).map(Level::new).collect(); Wheel { elapsed: 0, levels } @@ -99,9 +94,8 @@ where pub(crate) fn insert( &mut self, when: u64, - item: T::Owned, - store: &mut T::Store, - ) -> Result<(), (T::Owned, InsertError)> { + item: OwnedItem, + ) -> Result<(), (OwnedItem, InsertError)> { if when <= self.elapsed { return Err((item, InsertError::Elapsed)); } else if when - self.elapsed > MAX_DURATION { @@ -111,7 +105,7 @@ where // Get the level at which the entry should be stored let level = self.level_for(when); - self.levels[level].add_entry(when, item, store); + self.levels[level].add_entry(when, item); debug_assert!({ self.levels[level] @@ -124,11 +118,11 @@ where } /// Remove `item` from thee timing wheel. - pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) { - let when = T::when(item, store); + pub(crate) fn remove(&mut self, item: &Item) { + let when = item.when(); let level = self.level_for(when); - self.levels[level].remove_entry(when, item, store); + self.levels[level].remove_entry(when, item); } /// Instant at which to poll @@ -136,33 +130,35 @@ where self.next_expiration().map(|expiration| expiration.deadline) } - pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option { + /// Advances the timer up to the instant represented by `now`. + pub(crate) fn poll(&mut self, now: u64) -> Option { loop { - if poll.expiration.is_none() { - poll.expiration = self.next_expiration().and_then(|expiration| { - if expiration.deadline > poll.now { - None - } else { - Some(expiration) - } - }); - } + // under what circumstances is poll.expiration Some vs. None? + let expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > now { + None + } else { + Some(expiration) + } + }); - match poll.expiration { + match expiration { Some(ref expiration) => { - if let Some(item) = self.poll_expiration(expiration, store) { + if let Some(item) = self.poll_expiration(expiration) { return Some(item); } self.set_elapsed(expiration.deadline); } None => { - self.set_elapsed(poll.now); + // in this case the poll did not indicate an expiration + // _and_ we were not able to find a next expiration in + // the current list of timers. advance to the poll's + // current time and do nothing else. + self.set_elapsed(now); return None; } } - - poll.expiration = None; } } @@ -197,22 +193,22 @@ where res } - pub(crate) fn poll_expiration( - &mut self, - expiration: &Expiration, - store: &mut T::Store, - ) -> Option { - while let Some(item) = self.pop_entry(expiration, store) { + /// iteratively find entries that are between the wheel's current + /// time and the expiration time. for each in that population either + /// return it for notification (in the case of the last level) or tier + /// it down to the next level (in all other cases). + pub(crate) fn poll_expiration(&mut self, expiration: &Expiration) -> Option { + while let Some(item) = self.pop_entry(expiration) { if expiration.level == 0 { - debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline); + debug_assert_eq!(item.when(), expiration.deadline); return Some(item); } else { - let when = T::when(item.borrow(), store); + let when = item.when(); let next_level = expiration.level - 1; - self.levels[next_level].add_entry(when, item, store); + self.levels[next_level].add_entry(when, item); } } @@ -232,8 +228,8 @@ where } } - fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option { - self.levels[expiration.level].pop_entry_slot(expiration.slot, store) + fn pop_entry(&mut self, expiration: &Expiration) -> Option { + self.levels[expiration.level].pop_entry_slot(expiration.slot) } fn level_for(&self, when: u64) -> usize { @@ -251,15 +247,6 @@ fn level_for(elapsed: u64, when: u64) -> usize { significant / 6 } -impl Poll { - pub(crate) fn new(now: u64) -> Poll { - Poll { - now, - expiration: None, - } - } -} - #[cfg(all(test, not(loom)))] mod test { use super::*; diff --git a/tokio/src/time/wheel/stack.rs b/tokio/src/time/wheel/stack.rs index 6e55c38ccda..e7ed137f55a 100644 --- a/tokio/src/time/wheel/stack.rs +++ b/tokio/src/time/wheel/stack.rs @@ -1,26 +1,112 @@ -use std::borrow::Borrow; +use super::{Item, OwnedItem}; +use crate::time::driver::Entry; -/// Abstracts the stack operations needed to track timeouts. -pub(crate) trait Stack: Default { - /// Type of the item stored in the stack - type Owned: Borrow; +use std::ptr; - /// Borrowed item - type Borrowed; +/// A doubly linked stack +#[derive(Debug)] +pub(crate) struct Stack { + head: Option, +} + +impl Default for Stack { + fn default() -> Stack { + Stack { head: None } + } +} + +impl Stack { + pub(crate) fn is_empty(&self) -> bool { + self.head.is_none() + } + + pub(crate) fn push(&mut self, entry: OwnedItem) { + // Get a pointer to the entry to for the prev link + let ptr: *const Entry = &*entry as *const _; + + // Remove the old head entry + let old = self.head.take(); + + unsafe { + // Ensure the entry is not already in a stack. + debug_assert!((*entry.next_stack.get()).is_none()); + debug_assert!((*entry.prev_stack.get()).is_null()); + + if let Some(ref entry) = old.as_ref() { + debug_assert!({ + // The head is not already set to the entry + ptr != &***entry as *const _ + }); + + // Set the previous link on the old head + *entry.prev_stack.get() = ptr; + } + + // Set this entry's next pointer + *entry.next_stack.get() = old; + } + + // Update the head pointer + self.head = Some(entry); + } + + /// Pops an item from the stack + pub(crate) fn pop(&mut self) -> Option { + let entry = self.head.take(); + + unsafe { + if let Some(entry) = entry.as_ref() { + self.head = (*entry.next_stack.get()).take(); + + if let Some(entry) = self.head.as_ref() { + *entry.prev_stack.get() = ptr::null(); + } + + *entry.prev_stack.get() = ptr::null(); + } + } + + entry + } + + pub(crate) fn remove(&mut self, entry: &Item) { + unsafe { + // Ensure that the entry is in fact contained by the stack + debug_assert!({ + // This walks the full linked list even if an entry is found. + let mut next = self.head.as_ref(); + let mut contains = false; + + while let Some(n) = next { + if entry as *const _ == &**n as *const _ { + debug_assert!(!contains); + contains = true; + } + + next = (*n.next_stack.get()).as_ref(); + } - /// Item storage, this allows a slab to be used instead of just the heap - type Store; + contains + }); - /// Returns `true` if the stack is empty - fn is_empty(&self) -> bool; + // Unlink `entry` from the next node + let next = (*entry.next_stack.get()).take(); - /// Push an item onto the stack - fn push(&mut self, item: Self::Owned, store: &mut Self::Store); + if let Some(next) = next.as_ref() { + (*next.prev_stack.get()) = *entry.prev_stack.get(); + } - /// Pop an item from the stack - fn pop(&mut self, store: &mut Self::Store) -> Option; + // Unlink `entry` from the prev node - fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store); + if let Some(prev) = (*entry.prev_stack.get()).as_ref() { + *prev.next_stack.get() = next; + } else { + // It is the head + self.head = next; + } - fn when(item: &Self::Borrowed, store: &Self::Store) -> u64; + // Unset the prev pointer + *entry.prev_stack.get() = ptr::null(); + } + } }