Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix panics on updating DelayQueue entries #3270

Merged
merged 6 commits into from Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 6 additions & 4 deletions tokio-util/src/time/delay_queue.rs
Expand Up @@ -36,14 +36,14 @@ use std::task::{self, Poll, Waker};
/// # `Stream` implementation
///
/// Items are retrieved from the queue via [`DelayQueue::poll_expired`]. If no delays have
/// expired, no items are returned. In this case, `NotReady` is returned and the
/// expired, no items are returned. In this case, `Pending` is returned and the
/// current task is registered to be notified once the next item's delay has
/// expired.
///
/// If no items are in the queue, i.e. `is_empty()` returns `true`, then `poll`
/// returns `Ready(None)`. This indicates that the stream has reached an end.
/// However, if a new item is inserted *after*, `poll` will once again start
/// returning items or `NotReady.
/// returning items or `Pending.
///
/// Items are returned ordered by their expirations. Items that are configured
/// to expire first will be returned first. There are no ordering guarantees
Expand Down Expand Up @@ -538,7 +538,7 @@ impl<T> DelayQueue<T> {
///
/// delay_queue.reset_at(&key, Instant::now() + Duration::from_secs(10));
///
/// // "foo"is now scheduled to be returned in 10 seconds
/// // "foo" is now scheduled to be returned in 10 seconds
/// # }
/// ```
pub fn reset_at(&mut self, key: &Key, when: Instant) {
Expand All @@ -548,6 +548,8 @@ impl<T> DelayQueue<T> {
let when = self.normalize_deadline(when);

self.slab[key.index].when = when;
self.slab[key.index].expired = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, just to confirm, the reason this is correct is that the call to insert_idx will set it back to true if it is expired?

Copy link
Contributor Author

@wabain wabain Dec 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, remove_key combined with these assignments should leave the slab entry in the same state as if it was a newly created Data value in insert_at. Then insert_idx is responsible for placing the entry in the wheel or expired queue and setting the flag.


self.insert_idx(when, key.index);

let next_deadline = self.next_deadline();
Expand Down Expand Up @@ -711,7 +713,7 @@ impl<T> DelayQueue<T> {
/// Returns `true` if there are no items in the queue.
///
/// Note that this function returns `false` even if all items have not yet
/// expired and a call to `poll` will return `NotReady`.
/// expired and a call to `poll` will return `Pending`.
///
/// # Examples
///
Expand Down
4 changes: 1 addition & 3 deletions tokio-util/src/time/wheel/level.rs
Expand Up @@ -233,14 +233,13 @@ fn slot_for(duration: u64, level: usize) -> usize {
((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
}

/*
#[cfg(all(test, not(loom)))]
mod test {
use super::*;

#[test]
fn test_slot_for() {
for pos in 1..64 {
for pos in 0..64 {
assert_eq!(pos as usize, slot_for(pos, 0));
}

Expand All @@ -252,4 +251,3 @@ mod test {
}
}
}
*/
18 changes: 14 additions & 4 deletions tokio-util/src/time/wheel/mod.rs
Expand Up @@ -116,9 +116,17 @@ where
Ok(())
}

/// Remove `item` from thee timing wheel.
/// Remove `item` from the timing wheel.
pub(crate) fn remove(&mut self, item: &T::Borrowed, store: &mut T::Store) {
let when = T::when(item, store);

assert!(
Copy link
Contributor Author

@wabain wabain Dec 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I added this assert because while the assert in level_for wasn't valid in general, it turned out to be helpful in surfacing some forms of inconsistent state; that's how issue #2473 (looking up entries in the wheel which had been moved to the expired queue) manifested. Asserting this invariant can catch at least some cases of invalid API use or delay queue corruption.

I added the same condition as a debug_assert in the main Tokio crate, where I don't think it could be hit unless internal state were corrupted.

self.elapsed <= when,
"elapsed={}; when={}",
self.elapsed,
when
);

let level = self.level_for(when);

self.levels[level].remove_entry(when, item, store);
Expand Down Expand Up @@ -240,9 +248,11 @@ where
}

fn level_for(elapsed: u64, when: u64) -> usize {
let masked = elapsed ^ when;
const SLOT_MASK: u64 = (1 << 6) - 1;

assert!(masked != 0, "elapsed={}; when={}", elapsed, when);
// Mask in the trailing bits ignored by the level calculation in order to cap
// the possible leading zeros
let masked = elapsed ^ when | SLOT_MASK;

let leading_zeros = masked.leading_zeros() as usize;
let significant = 63 - leading_zeros;
Expand All @@ -255,7 +265,7 @@ mod test {

#[test]
fn test_level_for() {
for pos in 1..64 {
for pos in 0..64 {
assert_eq!(
0,
level_for(0, pos),
Expand Down
61 changes: 61 additions & 0 deletions tokio-util/tests/time_delay_queue.rs
Expand Up @@ -245,6 +245,35 @@ async fn reset_twice() {
assert!(queue.is_woken());
}

/// Regression test: Given an entry inserted with a deadline in the past, so
/// that it is placed directly on the expired queue, reset the entry to a
/// deadline in the future. Validate that this leaves the entry and queue in an
/// internally consistent state by running an additional reset on the entry
/// before polling it to completion.
#[tokio::test]
async fn repeatedly_reset_entry_inserted_as_expired() {
time::pause();
let mut queue = task::spawn(DelayQueue::new());
let now = Instant::now();

let key = queue.insert_at("foo", now - ms(100));

queue.reset_at(&key, now + ms(100));
queue.reset_at(&key, now + ms(50));

assert_pending!(poll!(queue));

time::sleep_until(now + ms(60)).await;

assert!(queue.is_woken());

let entry = assert_ready_ok!(poll!(queue)).into_inner();
assert_eq!(entry, "foo");

let entry = assert_ready!(poll!(queue));
assert!(entry.is_none());
}

#[tokio::test]
async fn remove_expired_item() {
time::pause();
Expand All @@ -261,6 +290,38 @@ async fn remove_expired_item() {
assert_eq!(entry.into_inner(), "foo");
}

/// Regression test: it should be possible to remove entries which fall in the
/// 0th slot of the internal timer wheel — that is, entries whose expiration
/// (a) falls at the beginning of one of the wheel's hierarchical levels and (b)
/// is equal to the wheel's current elapsed time.
#[tokio::test]
async fn remove_at_timer_wheel_threshold() {
time::pause();

let mut queue = task::spawn(DelayQueue::new());

let now = Instant::now();

let key1 = queue.insert_at("foo", now + ms(64));
let key2 = queue.insert_at("bar", now + ms(64));

sleep(ms(80)).await;

let entry = assert_ready_ok!(poll!(queue)).into_inner();

match entry {
"foo" => {
let entry = queue.remove(&key2).into_inner();
assert_eq!(entry, "bar");
}
"bar" => {
let entry = queue.remove(&key1).into_inner();
assert_eq!(entry, "foo");
}
other => panic!("other: {:?}", other),
}
}

#[tokio::test]
async fn expires_before_last_insert() {
time::pause();
Expand Down
4 changes: 1 addition & 3 deletions tokio/src/time/driver/wheel/level.rs
Expand Up @@ -255,14 +255,13 @@ fn slot_for(duration: u64, level: usize) -> usize {
((duration >> (level * 6)) % LEVEL_MULT as u64) as usize
}

/*
#[cfg(all(test, not(loom)))]
mod test {
use super::*;

#[test]
fn test_slot_for() {
for pos in 1..64 {
for pos in 0..64 {
assert_eq!(pos as usize, slot_for(pos, 0));
}

Expand All @@ -274,4 +273,3 @@ mod test {
}
}
}
*/
17 changes: 13 additions & 4 deletions tokio/src/time/driver/wheel/mod.rs
Expand Up @@ -122,6 +122,13 @@ impl Wheel {
if when == u64::max_value() {
self.pending.remove(item);
} else {
debug_assert!(
self.elapsed <= when,
"elapsed={}; when={}",
self.elapsed,
when
);

let level = self.level_for(when);

self.levels[level].remove_entry(item);
Expand Down Expand Up @@ -281,15 +288,17 @@ impl Wheel {
}

fn level_for(elapsed: u64, when: u64) -> usize {
let mut masked = elapsed ^ when;
const SLOT_MASK: u64 = (1 << 6) - 1;

// Mask in the trailing bits ignored by the level calculation in order to cap
// the possible leading zeros
let mut masked = elapsed ^ when | SLOT_MASK;

if masked >= MAX_DURATION {
// Fudge the timer into the top level
masked = MAX_DURATION - 1;
}

assert!(masked != 0, "elapsed={}; when={}", elapsed, when);

let leading_zeros = masked.leading_zeros() as usize;
let significant = 63 - leading_zeros;

Expand All @@ -302,7 +311,7 @@ mod test {

#[test]
fn test_level_for() {
for pos in 1..64 {
for pos in 0..64 {
assert_eq!(
0,
level_for(0, pos),
Expand Down