diff --git a/tokio-util/src/time/delay_queue.rs b/tokio-util/src/time/delay_queue.rs index b23c24e6149..92c922b89d4 100644 --- a/tokio-util/src/time/delay_queue.rs +++ b/tokio-util/src/time/delay_queue.rs @@ -141,7 +141,7 @@ pub struct DelayQueue { delay: Option, /// Wheel polling state - poll: wheel::Poll, + wheel_now: u64, /// Instant at which the timer starts start: Instant, @@ -251,7 +251,7 @@ impl DelayQueue { slab: Slab::with_capacity(capacity), expired: Stack::default(), delay: None, - poll: wheel::Poll::new(0), + wheel_now: 0, start: Instant::now(), } } @@ -733,11 +733,11 @@ impl DelayQueue { let now = crate::time::ms(delay.deadline() - self.start, crate::time::Round::Down); - self.poll = wheel::Poll::new(now); + self.wheel_now = now; } // We poll the wheel to get the next value out before finding the next deadline. - let wheel_idx = self.wheel.poll(&mut self.poll, &mut self.slab); + let wheel_idx = self.wheel.poll(self.wheel_now, &mut self.slab); self.delay = self.next_deadline().map(sleep_until); diff --git a/tokio-util/src/time/wheel/mod.rs b/tokio-util/src/time/wheel/mod.rs index a2ef27fc6c5..478037a3098 100644 --- a/tokio-util/src/time/wheel/mod.rs +++ b/tokio-util/src/time/wheel/mod.rs @@ -51,13 +51,6 @@ pub(crate) enum InsertError { Invalid, } -/// Poll expirations from the wheel -#[derive(Debug, Default)] -pub(crate) struct Poll { - now: u64, - expiration: Option, -} - impl Wheel where T: Stack, @@ -136,19 +129,18 @@ where self.next_expiration().map(|expiration| expiration.deadline) } - pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option { + /// Advances the timer up to the instant represented by `now`. + pub(crate) fn poll(&mut self, now: u64, store: &mut T::Store) -> Option { loop { - if poll.expiration.is_none() { - poll.expiration = self.next_expiration().and_then(|expiration| { - if expiration.deadline > poll.now { - None - } else { - Some(expiration) - } - }); - } + let expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > now { + None + } else { + Some(expiration) + } + }); - match poll.expiration { + match expiration { Some(ref expiration) => { if let Some(item) = self.poll_expiration(expiration, store) { return Some(item); @@ -157,12 +149,14 @@ where self.set_elapsed(expiration.deadline); } None => { - self.set_elapsed(poll.now); + // in this case the poll did not indicate an expiration + // _and_ we were not able to find a next expiration in + // the current list of timers. advance to the poll's + // current time and do nothing else. + self.set_elapsed(now); return None; } } - - poll.expiration = None; } } @@ -197,6 +191,10 @@ where res } + /// iteratively find entries that are between the wheel's current + /// time and the expiration time. for each in that population either + /// return it for notification (in the case of the last level) or tier + /// it down to the next level (in all other cases). pub(crate) fn poll_expiration( &mut self, expiration: &Expiration, @@ -251,15 +249,6 @@ fn level_for(elapsed: u64, when: u64) -> usize { significant / 6 } -impl Poll { - pub(crate) fn new(now: u64) -> Poll { - Poll { - now, - expiration: None, - } - } -} - #[cfg(all(test, not(loom)))] mod test { use super::*; diff --git a/tokio/src/time/delay.rs b/tokio/src/time/delay.rs index 42ae4b08431..9364860d806 100644 --- a/tokio/src/time/delay.rs +++ b/tokio/src/time/delay.rs @@ -1,8 +1,9 @@ -use crate::time::driver::Registration; -use crate::time::{Duration, Instant}; +use crate::time::driver::{Entry, Handle}; +use crate::time::{Duration, Error, Instant}; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use std::task::{self, Poll}; /// Waits until `deadline` is reached. @@ -16,8 +17,7 @@ use std::task::{self, Poll}; /// Canceling a delay is done by dropping the returned future. No additional /// cleanup work is required. pub fn sleep_until(deadline: Instant) -> Delay { - let registration = Registration::new(deadline, Duration::from_millis(0)); - Delay { registration } + Delay::new_timeout(deadline, Duration::from_millis(0)) } /// Waits until `duration` has elapsed. @@ -63,25 +63,27 @@ pub struct Delay { /// The link between the `Delay` instance and the timer that drives it. /// /// This also stores the `deadline` value. - registration: Registration, + entry: Arc, } impl Delay { pub(crate) fn new_timeout(deadline: Instant, duration: Duration) -> Delay { - let registration = Registration::new(deadline, duration); - Delay { registration } + let handle = Handle::current(); + let entry = Entry::new(&handle, deadline, duration); + + Delay { entry } } /// Returns the instant at which the future will complete. pub fn deadline(&self) -> Instant { - self.registration.deadline() + self.entry.time_ref().deadline } /// Returns `true` if the `Delay` has elapsed /// /// A `Delay` is elapsed when the requested duration has elapsed. pub fn is_elapsed(&self) -> bool { - self.registration.is_elapsed() + self.entry.is_elapsed() } /// Resets the `Delay` instance to a new deadline. @@ -92,7 +94,21 @@ impl Delay { /// This function can be called both before and after the future has /// completed. pub fn reset(&mut self, deadline: Instant) { - self.registration.reset(deadline); + unsafe { + self.entry.time_mut().deadline = deadline; + } + + Entry::reset(&mut self.entry); + } + + fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll> { + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + + self.entry.poll_elapsed(cx).map(move |r| { + coop.made_progress(); + r + }) } } @@ -109,9 +125,15 @@ impl Future for Delay { // Both cases are extremely rare, and pretty accurately fit into // "logic errors", so we just panic in this case. A user couldn't // really do much better if we passed the error onwards. - match ready!(self.registration.poll_elapsed(cx)) { + match ready!(self.poll_elapsed(cx)) { Ok(()) => Poll::Ready(()), Err(e) => panic!("timer error: {}", e), } } } + +impl Drop for Delay { + fn drop(&mut self) { + Entry::cancel(&self.entry); + } +} diff --git a/tokio/src/time/driver/entry.rs b/tokio/src/time/driver/entry.rs index 974465c19be..20f8e1c6dce 100644 --- a/tokio/src/time/driver/entry.rs +++ b/tokio/src/time/driver/entry.rs @@ -83,7 +83,7 @@ pub(crate) struct Entry { /// Next entry in the State's linked list. /// /// This is only accessed by the timer - pub(super) next_stack: UnsafeCell>>, + pub(crate) next_stack: UnsafeCell>>, /// Previous entry in the State's linked list. /// @@ -91,7 +91,7 @@ pub(crate) struct Entry { /// entry. /// /// This is a weak reference. - pub(super) prev_stack: UnsafeCell<*const Entry>, + pub(crate) prev_stack: UnsafeCell<*const Entry>, } /// Stores the info for `Delay`. @@ -112,12 +112,12 @@ const ERROR: u64 = u64::MAX; impl Entry { pub(crate) fn new(handle: &Handle, deadline: Instant, duration: Duration) -> Arc { let inner = handle.inner().unwrap(); - let entry: Entry; - // Increment the number of active timeouts - if let Err(err) = inner.increment() { - entry = Entry::new2(deadline, duration, Weak::new(), ERROR); + // Attempt to increment the number of active timeouts + let entry = if let Err(err) = inner.increment() { + let entry = Entry::new2(deadline, duration, Weak::new(), ERROR); entry.error(err); + entry } else { let when = inner.normalize_deadline(deadline); let state = if when <= inner.elapsed() { @@ -125,8 +125,8 @@ impl Entry { } else { when }; - entry = Entry::new2(deadline, duration, Arc::downgrade(&inner), state); - } + Entry::new2(deadline, duration, Arc::downgrade(&inner), state) + }; let entry = Arc::new(entry); if let Err(err) = inner.queue(&entry) { @@ -147,6 +147,10 @@ impl Entry { &mut *self.time.0.get() } + pub(crate) fn when(&self) -> u64 { + self.when_internal().expect("invalid internal state") + } + /// The current entry state as known by the timer. This is not the value of /// `state`, but lets the timer know how to converge its state to `state`. pub(crate) fn when_internal(&self) -> Option { diff --git a/tokio/src/time/driver/mod.rs b/tokio/src/time/driver/mod.rs index bb6c28b34de..5ece7c72d74 100644 --- a/tokio/src/time/driver/mod.rs +++ b/tokio/src/time/driver/mod.rs @@ -9,12 +9,6 @@ pub(super) use self::entry::Entry; mod handle; pub(crate) use self::handle::Handle; -mod registration; -pub(crate) use self::registration::Registration; - -mod stack; -use self::stack::Stack; - use crate::loom::sync::atomic::{AtomicU64, AtomicUsize}; use crate::park::{Park, Unpark}; use crate::time::{wheel, Error}; @@ -73,7 +67,7 @@ use std::{cmp, fmt}; /// When the timer processes entries at level zero, it will notify all the /// `Delay` instances as their deadlines have been reached. For all higher /// levels, all entries will be redistributed across the wheel at the next level -/// down. Eventually, as time progresses, entries will [`Delay`][delay] instances will +/// down. Eventually, as time progresses, entries with [`Delay`][delay] instances will /// either be canceled (dropped) or their associated entries will reach level /// zero and be notified. /// @@ -87,7 +81,7 @@ pub(crate) struct Driver { inner: Arc, /// Timer wheel - wheel: wheel::Wheel, + wheel: wheel::Wheel, /// Thread parker. The `Driver` park implementation delegates to this. park: T, @@ -163,9 +157,8 @@ where self.clock.now() - self.inner.start, crate::time::Round::Down, ); - let mut poll = wheel::Poll::new(now); - while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { + while let Some(entry) = self.wheel.poll(now) { let when = entry.when_internal().expect("invalid internal entry state"); // Fire the entry @@ -193,7 +186,7 @@ where self.clear_entry(&entry); } (None, Some(when)) => { - // Queue the entry + // Add the entry to the timer wheel self.add_entry(entry, when); } (Some(_), Some(next)) => { @@ -205,19 +198,17 @@ where } fn clear_entry(&mut self, entry: &Arc) { - self.wheel.remove(entry, &mut ()); + self.wheel.remove(entry); entry.set_when_internal(None); } /// Fires the entry if it needs to, otherwise queue it to be processed later. - /// - /// Returns `None` if the entry was fired. fn add_entry(&mut self, entry: Arc, when: u64) { use crate::time::wheel::InsertError; entry.set_when_internal(Some(when)); - match self.wheel.insert(when, entry, &mut ()) { + match self.wheel.insert(when, entry) { Ok(_) => {} Err((entry, InsertError::Elapsed)) => { // The entry's deadline has elapsed, so fire it and update the @@ -320,9 +311,9 @@ where self.inner.process.shutdown(); // Clear the wheel, using u64::MAX allows us to drain everything - let mut poll = wheel::Poll::new(u64::MAX); + let end_of_time = u64::MAX; - while let Some(entry) = self.wheel.poll(&mut poll, &mut ()) { + while let Some(entry) = self.wheel.poll(end_of_time) { entry.error(Error::shutdown()); } @@ -387,6 +378,10 @@ impl Inner { debug_assert!(prev <= MAX_TIMEOUTS); } + /// add the entry to the "process queue". entries are not immediately + /// pushed into the timer wheel but are instead pushed into the + /// process queue and then moved from the process queue into the timer + /// wheel on next `process` fn queue(&self, entry: &Arc) -> Result<(), Error> { if self.process.push(entry)? { // The timer is notified so that it can process the timeout diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs deleted file mode 100644 index 3a0b34501b0..00000000000 --- a/tokio/src/time/driver/registration.rs +++ /dev/null @@ -1,56 +0,0 @@ -use crate::time::driver::{Entry, Handle}; -use crate::time::{Duration, Error, Instant}; - -use std::sync::Arc; -use std::task::{self, Poll}; - -/// Registration with a timer. -/// -/// The association between a `Delay` instance and a timer is done lazily in -/// `poll` -#[derive(Debug)] -pub(crate) struct Registration { - entry: Arc, -} - -impl Registration { - pub(crate) fn new(deadline: Instant, duration: Duration) -> Registration { - let handle = Handle::current(); - - Registration { - entry: Entry::new(&handle, deadline, duration), - } - } - - pub(crate) fn deadline(&self) -> Instant { - self.entry.time_ref().deadline - } - - pub(crate) fn reset(&mut self, deadline: Instant) { - unsafe { - self.entry.time_mut().deadline = deadline; - } - - Entry::reset(&mut self.entry); - } - - pub(crate) fn is_elapsed(&self) -> bool { - self.entry.is_elapsed() - } - - pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - self.entry.poll_elapsed(cx).map(move |r| { - coop.made_progress(); - r - }) - } -} - -impl Drop for Registration { - fn drop(&mut self) { - Entry::cancel(&self.entry); - } -} diff --git a/tokio/src/time/driver/stack.rs b/tokio/src/time/driver/stack.rs deleted file mode 100644 index 3e2924f2653..00000000000 --- a/tokio/src/time/driver/stack.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::time::driver::Entry; -use crate::time::wheel; - -use std::ptr; -use std::sync::Arc; - -/// A doubly linked stack -#[derive(Debug)] -pub(crate) struct Stack { - head: Option>, -} - -impl Default for Stack { - fn default() -> Stack { - Stack { head: None } - } -} - -impl wheel::Stack for Stack { - type Owned = Arc; - type Borrowed = Entry; - type Store = (); - - fn is_empty(&self) -> bool { - self.head.is_none() - } - - fn push(&mut self, entry: Self::Owned, _: &mut Self::Store) { - // Get a pointer to the entry to for the prev link - let ptr: *const Entry = &*entry as *const _; - - // Remove the old head entry - let old = self.head.take(); - - unsafe { - // Ensure the entry is not already in a stack. - debug_assert!((*entry.next_stack.get()).is_none()); - debug_assert!((*entry.prev_stack.get()).is_null()); - - if let Some(ref entry) = old.as_ref() { - debug_assert!({ - // The head is not already set to the entry - ptr != &***entry as *const _ - }); - - // Set the previous link on the old head - *entry.prev_stack.get() = ptr; - } - - // Set this entry's next pointer - *entry.next_stack.get() = old; - } - - // Update the head pointer - self.head = Some(entry); - } - - /// Pops an item from the stack - fn pop(&mut self, _: &mut ()) -> Option> { - let entry = self.head.take(); - - unsafe { - if let Some(entry) = entry.as_ref() { - self.head = (*entry.next_stack.get()).take(); - - if let Some(entry) = self.head.as_ref() { - *entry.prev_stack.get() = ptr::null(); - } - - *entry.prev_stack.get() = ptr::null(); - } - } - - entry - } - - fn remove(&mut self, entry: &Entry, _: &mut ()) { - unsafe { - // Ensure that the entry is in fact contained by the stack - debug_assert!({ - // This walks the full linked list even if an entry is found. - let mut next = self.head.as_ref(); - let mut contains = false; - - while let Some(n) = next { - if entry as *const _ == &**n as *const _ { - debug_assert!(!contains); - contains = true; - } - - next = (*n.next_stack.get()).as_ref(); - } - - contains - }); - - // Unlink `entry` from the next node - let next = (*entry.next_stack.get()).take(); - - if let Some(next) = next.as_ref() { - (*next.prev_stack.get()) = *entry.prev_stack.get(); - } - - // Unlink `entry` from the prev node - - if let Some(prev) = (*entry.prev_stack.get()).as_ref() { - *prev.next_stack.get() = next; - } else { - // It is the head - self.head = next; - } - - // Unset the prev pointer - *entry.prev_stack.get() = ptr::null(); - } - } - - fn when(item: &Entry, _: &()) -> u64 { - item.when_internal().expect("invalid internal state") - } -} diff --git a/tokio/src/time/tests/mod.rs b/tokio/src/time/tests/mod.rs index e112b8e1dc3..a043d65e19d 100644 --- a/tokio/src/time/tests/mod.rs +++ b/tokio/src/time/tests/mod.rs @@ -8,10 +8,10 @@ fn assert_sync() {} #[test] fn registration_is_send_and_sync() { - use crate::time::driver::Registration; + use crate::time::delay::Delay; - assert_send::(); - assert_sync::(); + assert_send::(); + assert_sync::(); } #[test] diff --git a/tokio/src/time/wheel/level.rs b/tokio/src/time/wheel/level.rs index 49f9bfb9cf0..d51d26a0362 100644 --- a/tokio/src/time/wheel/level.rs +++ b/tokio/src/time/wheel/level.rs @@ -1,9 +1,10 @@ +use super::{Item, OwnedItem}; use crate::time::wheel::Stack; use std::fmt; /// Wheel for a single level in the timer. This wheel contains 64 slots. -pub(crate) struct Level { +pub(crate) struct Level { level: usize, /// Bit field tracking which slots currently contain entries. @@ -16,7 +17,7 @@ pub(crate) struct Level { occupied: u64, /// Slots - slot: [T; LEVEL_MULT], + slot: [Stack; LEVEL_MULT], } /// Indicates when a slot must be processed next. @@ -37,87 +38,90 @@ pub(crate) struct Expiration { /// Being a power of 2 is very important. const LEVEL_MULT: usize = 64; -impl Level { - pub(crate) fn new(level: usize) -> Level { - // Rust's derived implementations for arrays require that the value - // contained by the array be `Copy`. So, here we have to manually - // initialize every single slot. - macro_rules! s { - () => { - T::default() - }; - }; +impl Level { + pub(crate) fn new(level: usize) -> Level { + // A value has to be Copy in order to use syntax like: + // let stack = Stack::default(); + // ... + // slots: [stack; 64], + // + // Alternatively, since Stack is Default one can + // use syntax like: + // let slots: [Stack; 64] = Default::default(); + // + // However, that is only supported for arrays of size + // 32 or fewer. So in our case we have to explicitly + // invoke the constructor for each array element. + let ctor = Stack::default; Level { level, occupied: 0, slot: [ - // It does not look like the necessary traits are - // derived for [T; 64]. - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), - s!(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), + ctor(), ], } } @@ -173,17 +177,17 @@ impl Level { Some(slot) } - pub(crate) fn add_entry(&mut self, when: u64, item: T::Owned, store: &mut T::Store) { + pub(crate) fn add_entry(&mut self, when: u64, item: OwnedItem) { let slot = slot_for(when, self.level); - self.slot[slot].push(item, store); + self.slot[slot].push(item); self.occupied |= occupied_bit(slot); } - pub(crate) fn remove_entry(&mut self, when: u64, item: &T::Borrowed, store: &mut T::Store) { + pub(crate) fn remove_entry(&mut self, when: u64, item: &Item) { let slot = slot_for(when, self.level); - self.slot[slot].remove(item, store); + self.slot[slot].remove(item); if self.slot[slot].is_empty() { // The bit is currently set @@ -194,8 +198,8 @@ impl Level { } } - pub(crate) fn pop_entry_slot(&mut self, slot: usize, store: &mut T::Store) -> Option { - let ret = self.slot[slot].pop(store); + pub(crate) fn pop_entry_slot(&mut self, slot: usize) -> Option { + let ret = self.slot[slot].pop(); if ret.is_some() && self.slot[slot].is_empty() { // The bit is currently set @@ -208,7 +212,7 @@ impl Level { } } -impl fmt::Debug for Level { +impl fmt::Debug for Level { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Level") .field("occupied", &self.occupied) diff --git a/tokio/src/time/wheel/mod.rs b/tokio/src/time/wheel/mod.rs index a2ef27fc6c5..03861240ac1 100644 --- a/tokio/src/time/wheel/mod.rs +++ b/tokio/src/time/wheel/mod.rs @@ -1,3 +1,5 @@ +use crate::time::driver::Entry; + mod level; pub(crate) use self::level::Expiration; use self::level::Level; @@ -5,9 +7,12 @@ use self::level::Level; mod stack; pub(crate) use self::stack::Stack; -use std::borrow::Borrow; +use std::sync::Arc; use std::usize; +pub(super) type Item = Entry; +pub(super) type OwnedItem = Arc; + /// Timing wheel implementation. /// /// This type provides the hashed timing wheel implementation that backs `Timer` @@ -20,7 +25,7 @@ use std::usize; /// /// See `Timer` documentation for some implementation notes. #[derive(Debug)] -pub(crate) struct Wheel { +pub(crate) struct Wheel { /// The number of milliseconds elapsed since the wheel started. elapsed: u64, @@ -34,7 +39,7 @@ pub(crate) struct Wheel { /// * ~ 4 min slots / ~ 4 hr range /// * ~ 4 hr slots / ~ 12 day range /// * ~ 12 day slots / ~ 2 yr range - levels: Vec>, + levels: Vec, } /// Number of levels. Each level has 64 slots. By using 6 levels with 64 slots @@ -51,19 +56,9 @@ pub(crate) enum InsertError { Invalid, } -/// Poll expirations from the wheel -#[derive(Debug, Default)] -pub(crate) struct Poll { - now: u64, - expiration: Option, -} - -impl Wheel -where - T: Stack, -{ +impl Wheel { /// Create a new timing wheel - pub(crate) fn new() -> Wheel { + pub(crate) fn new() -> Wheel { let levels = (0..NUM_LEVELS).map(Level::new).collect(); Wheel { elapsed: 0, levels } @@ -99,9 +94,8 @@ where pub(crate) fn insert( &mut self, when: u64, - item: T::Owned, - store: &mut T::Store, - ) -> Result<(), (T::Owned, InsertError)> { + item: OwnedItem, + ) -> Result<(), (OwnedItem, InsertError)> { if when <= self.elapsed { return Err((item, InsertError::Elapsed)); } else if when - self.elapsed > MAX_DURATION { @@ -111,7 +105,7 @@ where // Get the level at which the entry should be stored let level = self.level_for(when); - self.levels[level].add_entry(when, item, store); + self.levels[level].add_entry(when, item); debug_assert!({ self.levels[level] @@ -124,11 +118,11 @@ where } /// Remove `item` from thee timing wheel. - pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) { - let when = T::when(item, store); + pub(crate) fn remove(&mut self, item: &Item) { + let when = item.when(); let level = self.level_for(when); - self.levels[level].remove_entry(when, item, store); + self.levels[level].remove_entry(when, item); } /// Instant at which to poll @@ -136,33 +130,35 @@ where self.next_expiration().map(|expiration| expiration.deadline) } - pub(crate) fn poll(&mut self, poll: &mut Poll, store: &mut T::Store) -> Option { + /// Advances the timer up to the instant represented by `now`. + pub(crate) fn poll(&mut self, now: u64) -> Option { loop { - if poll.expiration.is_none() { - poll.expiration = self.next_expiration().and_then(|expiration| { - if expiration.deadline > poll.now { - None - } else { - Some(expiration) - } - }); - } + // under what circumstances is poll.expiration Some vs. None? + let expiration = self.next_expiration().and_then(|expiration| { + if expiration.deadline > now { + None + } else { + Some(expiration) + } + }); - match poll.expiration { + match expiration { Some(ref expiration) => { - if let Some(item) = self.poll_expiration(expiration, store) { + if let Some(item) = self.poll_expiration(expiration) { return Some(item); } self.set_elapsed(expiration.deadline); } None => { - self.set_elapsed(poll.now); + // in this case the poll did not indicate an expiration + // _and_ we were not able to find a next expiration in + // the current list of timers. advance to the poll's + // current time and do nothing else. + self.set_elapsed(now); return None; } } - - poll.expiration = None; } } @@ -197,22 +193,22 @@ where res } - pub(crate) fn poll_expiration( - &mut self, - expiration: &Expiration, - store: &mut T::Store, - ) -> Option { - while let Some(item) = self.pop_entry(expiration, store) { + /// iteratively find entries that are between the wheel's current + /// time and the expiration time. for each in that population either + /// return it for notification (in the case of the last level) or tier + /// it down to the next level (in all other cases). + pub(crate) fn poll_expiration(&mut self, expiration: &Expiration) -> Option { + while let Some(item) = self.pop_entry(expiration) { if expiration.level == 0 { - debug_assert_eq!(T::when(item.borrow(), store), expiration.deadline); + debug_assert_eq!(item.when(), expiration.deadline); return Some(item); } else { - let when = T::when(item.borrow(), store); + let when = item.when(); let next_level = expiration.level - 1; - self.levels[next_level].add_entry(when, item, store); + self.levels[next_level].add_entry(when, item); } } @@ -232,8 +228,8 @@ where } } - fn pop_entry(&mut self, expiration: &Expiration, store: &mut T::Store) -> Option { - self.levels[expiration.level].pop_entry_slot(expiration.slot, store) + fn pop_entry(&mut self, expiration: &Expiration) -> Option { + self.levels[expiration.level].pop_entry_slot(expiration.slot) } fn level_for(&self, when: u64) -> usize { @@ -251,15 +247,6 @@ fn level_for(elapsed: u64, when: u64) -> usize { significant / 6 } -impl Poll { - pub(crate) fn new(now: u64) -> Poll { - Poll { - now, - expiration: None, - } - } -} - #[cfg(all(test, not(loom)))] mod test { use super::*; diff --git a/tokio/src/time/wheel/stack.rs b/tokio/src/time/wheel/stack.rs index 6e55c38ccda..e7ed137f55a 100644 --- a/tokio/src/time/wheel/stack.rs +++ b/tokio/src/time/wheel/stack.rs @@ -1,26 +1,112 @@ -use std::borrow::Borrow; +use super::{Item, OwnedItem}; +use crate::time::driver::Entry; -/// Abstracts the stack operations needed to track timeouts. -pub(crate) trait Stack: Default { - /// Type of the item stored in the stack - type Owned: Borrow; +use std::ptr; - /// Borrowed item - type Borrowed; +/// A doubly linked stack +#[derive(Debug)] +pub(crate) struct Stack { + head: Option, +} + +impl Default for Stack { + fn default() -> Stack { + Stack { head: None } + } +} + +impl Stack { + pub(crate) fn is_empty(&self) -> bool { + self.head.is_none() + } + + pub(crate) fn push(&mut self, entry: OwnedItem) { + // Get a pointer to the entry to for the prev link + let ptr: *const Entry = &*entry as *const _; + + // Remove the old head entry + let old = self.head.take(); + + unsafe { + // Ensure the entry is not already in a stack. + debug_assert!((*entry.next_stack.get()).is_none()); + debug_assert!((*entry.prev_stack.get()).is_null()); + + if let Some(ref entry) = old.as_ref() { + debug_assert!({ + // The head is not already set to the entry + ptr != &***entry as *const _ + }); + + // Set the previous link on the old head + *entry.prev_stack.get() = ptr; + } + + // Set this entry's next pointer + *entry.next_stack.get() = old; + } + + // Update the head pointer + self.head = Some(entry); + } + + /// Pops an item from the stack + pub(crate) fn pop(&mut self) -> Option { + let entry = self.head.take(); + + unsafe { + if let Some(entry) = entry.as_ref() { + self.head = (*entry.next_stack.get()).take(); + + if let Some(entry) = self.head.as_ref() { + *entry.prev_stack.get() = ptr::null(); + } + + *entry.prev_stack.get() = ptr::null(); + } + } + + entry + } + + pub(crate) fn remove(&mut self, entry: &Item) { + unsafe { + // Ensure that the entry is in fact contained by the stack + debug_assert!({ + // This walks the full linked list even if an entry is found. + let mut next = self.head.as_ref(); + let mut contains = false; + + while let Some(n) = next { + if entry as *const _ == &**n as *const _ { + debug_assert!(!contains); + contains = true; + } + + next = (*n.next_stack.get()).as_ref(); + } - /// Item storage, this allows a slab to be used instead of just the heap - type Store; + contains + }); - /// Returns `true` if the stack is empty - fn is_empty(&self) -> bool; + // Unlink `entry` from the next node + let next = (*entry.next_stack.get()).take(); - /// Push an item onto the stack - fn push(&mut self, item: Self::Owned, store: &mut Self::Store); + if let Some(next) = next.as_ref() { + (*next.prev_stack.get()) = *entry.prev_stack.get(); + } - /// Pop an item from the stack - fn pop(&mut self, store: &mut Self::Store) -> Option; + // Unlink `entry` from the prev node - fn remove(&mut self, item: &Self::Borrowed, store: &mut Self::Store); + if let Some(prev) = (*entry.prev_stack.get()).as_ref() { + *prev.next_stack.get() = next; + } else { + // It is the head + self.head = next; + } - fn when(item: &Self::Borrowed, store: &Self::Store) -> u64; + // Unset the prev pointer + *entry.prev_stack.get() = ptr::null(); + } + } }