Skip to content

Commit

Permalink
util: remove error case from the infallible DelayQueue::poll_elapsed (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Ralith committed Feb 9, 2022
1 parent 0b05ef6 commit cf38ba6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 54 deletions.
45 changes: 19 additions & 26 deletions tokio-util/src/time/delay_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use crate::time::wheel::{self, Wheel};

use futures_core::ready;
use tokio::time::{error::Error, sleep_until, Duration, Instant, Sleep};
use tokio::time::{sleep_until, Duration, Instant, Sleep};

use core::ops::{Index, IndexMut};
use slab::Slab;
Expand Down Expand Up @@ -72,7 +72,6 @@ use std::task::{self, Poll, Waker};
/// Using `DelayQueue` to manage cache entries.
///
/// ```rust,no_run
/// use tokio::time::error::Error;
/// use tokio_util::time::{DelayQueue, delay_queue};
///
/// use futures::ready;
Expand Down Expand Up @@ -108,13 +107,12 @@ use std::task::{self, Poll, Waker};
/// }
/// }
///
/// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
/// while let Some(res) = ready!(self.expirations.poll_expired(cx)) {
/// let entry = res?;
/// fn poll_purge(&mut self, cx: &mut Context<'_>) -> Poll<()> {
/// while let Some(entry) = ready!(self.expirations.poll_expired(cx)) {
/// self.entries.remove(entry.get_ref());
/// }
///
/// Poll::Ready(Ok(()))
/// Poll::Ready(())
/// }
/// }
/// ```
Expand Down Expand Up @@ -577,10 +575,7 @@ impl<T> DelayQueue<T> {
/// Attempts to pull out the next value of the delay queue, registering the
/// current task for wakeup if the value is not yet available, and returning
/// `None` if the queue is exhausted.
pub fn poll_expired(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Expired<T>, Error>>> {
pub fn poll_expired(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Expired<T>>> {
if !self
.waker
.as_ref()
Expand All @@ -591,18 +586,16 @@ impl<T> DelayQueue<T> {
}

let item = ready!(self.poll_idx(cx));
Poll::Ready(item.map(|result| {
result.map(|key| {
let data = self.slab.remove(&key);
debug_assert!(data.next.is_none());
debug_assert!(data.prev.is_none());

Expired {
key,
data: data.inner,
deadline: self.start + Duration::from_millis(data.when),
}
})
Poll::Ready(item.map(|key| {
let data = self.slab.remove(&key);
debug_assert!(data.next.is_none());
debug_assert!(data.prev.is_none());

Expired {
key,
data: data.inner,
deadline: self.start + Duration::from_millis(data.when),
}
}))
}

Expand Down Expand Up @@ -1017,13 +1010,13 @@ impl<T> DelayQueue<T> {
/// should be returned.
///
/// A slot should be returned when the associated deadline has been reached.
fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Result<Key, Error>>> {
fn poll_idx(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Key>> {
use self::wheel::Stack;

let expired = self.expired.pop(&mut self.slab);

if expired.is_some() {
return Poll::Ready(expired.map(Ok));
return Poll::Ready(expired);
}

loop {
Expand All @@ -1043,7 +1036,7 @@ impl<T> DelayQueue<T> {
self.delay = self.next_deadline().map(|when| Box::pin(sleep_until(when)));

if let Some(idx) = wheel_idx {
return Poll::Ready(Some(Ok(idx)));
return Poll::Ready(Some(idx));
}

if self.delay.is_none() {
Expand Down Expand Up @@ -1075,7 +1068,7 @@ impl<T> Default for DelayQueue<T> {
impl<T> futures_core::Stream for DelayQueue<T> {
// DelayQueue seems much more specific, where a user may care that it
// has reached capacity, so return those errors instead of panicking.
type Item = Result<Expired<T>, Error>;
type Item = Expired<T>;

fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
DelayQueue::poll_expired(self.get_mut(), cx)
Expand Down
56 changes: 28 additions & 28 deletions tokio-util/tests/time_delay_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#![cfg(feature = "full")]

use tokio::time::{self, sleep, sleep_until, Duration, Instant};
use tokio_test::{assert_ok, assert_pending, assert_ready, task};
use tokio_test::{assert_pending, assert_ready, task};
use tokio_util::time::DelayQueue;

macro_rules! poll {
Expand All @@ -12,12 +12,12 @@ macro_rules! poll {
};
}

macro_rules! assert_ready_ok {
macro_rules! assert_ready_some {
($e:expr) => {{
assert_ok!(match assert_ready!($e) {
match assert_ready!($e) {
Some(v) => v,
None => panic!("None"),
})
}
}};
}

Expand All @@ -31,7 +31,7 @@ async fn single_immediate_delay() {
// Advance time by 1ms to handle thee rounding
sleep(ms(1)).await;

assert_ready_ok!(poll!(queue));
assert_ready_some!(poll!(queue));

let entry = assert_ready!(poll!(queue));
assert!(entry.is_none())
Expand All @@ -52,7 +52,7 @@ async fn multi_immediate_delays() {
let mut res = vec![];

while res.len() < 3 {
let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
res.push(entry.into_inner());
}

Expand Down Expand Up @@ -83,7 +83,7 @@ async fn single_short_delay() {

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
assert_eq!(*entry.get_ref(), "foo");

let entry = assert_ready!(poll!(queue));
Expand Down Expand Up @@ -193,7 +193,7 @@ async fn reset_entry() {

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
assert_eq!(*entry.get_ref(), "foo");

let entry = assert_ready!(poll!(queue));
Expand Down Expand Up @@ -271,7 +271,7 @@ async fn repeatedly_reset_entry_inserted_as_expired() {

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");

let entry = assert_ready!(poll!(queue));
Expand Down Expand Up @@ -311,7 +311,7 @@ async fn remove_at_timer_wheel_threshold() {

sleep(ms(80)).await;

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();

match entry {
"foo" => {
Expand Down Expand Up @@ -348,7 +348,7 @@ async fn expires_before_last_insert() {

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "bar");
}

Expand All @@ -375,14 +375,14 @@ async fn multi_reset() {

sleep(ms(50)).await;

let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
assert_eq!(*entry.get_ref(), "two");

assert_pending!(poll!(queue));

sleep(ms(50)).await;

let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
assert_eq!(*entry.get_ref(), "one");

let entry = assert_ready!(poll!(queue));
Expand All @@ -408,7 +408,7 @@ async fn expire_first_key_when_reset_to_expire_earlier() {

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "one");
}

Expand All @@ -431,7 +431,7 @@ async fn expire_second_key_when_reset_to_expire_earlier() {

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "two");
}

Expand All @@ -453,7 +453,7 @@ async fn reset_first_expiring_item_to_expire_later() {

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "two");
}

Expand All @@ -479,7 +479,7 @@ async fn insert_before_first_after_poll() {

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "two");
}

Expand All @@ -504,7 +504,7 @@ async fn insert_after_ready_poll() {
let mut res = vec![];

while res.len() < 3 {
let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
res.push(entry.into_inner());
queue.insert_at("foo", now + ms(500));
}
Expand Down Expand Up @@ -549,7 +549,7 @@ async fn reset_later_after_slot_starts() {
sleep(ms(1)).await;
assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");
}

Expand All @@ -568,7 +568,7 @@ async fn reset_inserted_expired() {

sleep(ms(200)).await;

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");

assert_eq!(queue.len(), 0);
Expand Down Expand Up @@ -607,7 +607,7 @@ async fn reset_earlier_after_slot_starts() {
sleep(ms(1)).await;
assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");
}

Expand All @@ -630,7 +630,7 @@ async fn insert_in_past_after_poll_fires_immediately() {

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
let entry = assert_ready_some!(poll!(queue)).into_inner();
assert_eq!(entry, "bar");
}

Expand All @@ -657,7 +657,7 @@ async fn compact_expire_empty() {

let mut res = vec![];
while res.len() < 2 {
let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
res.push(entry.into_inner());
}

Expand Down Expand Up @@ -704,7 +704,7 @@ async fn compact_remove_remapped_keys() {

let mut res = vec![];
while res.len() < 2 {
let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
res.push(entry.into_inner());
}

Expand Down Expand Up @@ -743,7 +743,7 @@ async fn compact_change_deadline() {

let mut res = vec![];
while res.len() < 2 {
let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
res.push(entry.into_inner());
}

Expand All @@ -763,14 +763,14 @@ async fn compact_change_deadline() {
sleep(ms(10)).await;

while res.len() < 4 {
let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
res.push(entry.into_inner());
}

sleep(ms(10)).await;

while res.len() < 6 {
let entry = assert_ready_ok!(poll!(queue));
let entry = assert_ready_some!(poll!(queue));
res.push(entry.into_inner());
}

Expand Down Expand Up @@ -803,7 +803,7 @@ async fn remove_after_compact_poll() {
queue.insert_at("bar", now + ms(20));

sleep(ms(10)).await;
assert_eq!(assert_ready_ok!(poll!(queue)).key(), foo_key);
assert_eq!(assert_ready_some!(poll!(queue)).key(), foo_key);

queue.compact();

Expand Down

0 comments on commit cf38ba6

Please sign in to comment.