Skip to content

Commit

Permalink
make timeout robust against budget-depleting tasks
Browse files Browse the repository at this point in the history
Up until now, if the future that we were applying a timeout to
consistently depleted the coop budget, the timeout never got a chance to
be evaluated. In the next call to `poll`, the underlying future would be
polled and it would once again deplete the budget. In those circumstances,
timeouts would not be respected. This can be surprising to people, and
in fact it was in tokio-rs#4291 .

The solution is to make a budget exception with `timeout` if it was the
underlying future that depleted the budget.
  • Loading branch information
BraulioVM committed Dec 11, 2021
1 parent 4b6bb1d commit 2151b1e
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 15 deletions.
19 changes: 8 additions & 11 deletions tokio/src/coop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ impl Budget {
const fn unconstrained() -> Budget {
Budget(None)
}
}

cfg_rt_multi_thread! {
impl Budget {
fn has_remaining(self) -> bool {
self.0.map(|budget| budget > 0).unwrap_or(true)
}

fn has_remaining(self) -> bool {
self.0.map(|budget| budget > 0).unwrap_or(true)
}
}

Expand Down Expand Up @@ -107,16 +104,16 @@ fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
})
}

#[inline(always)]
pub(crate) fn has_budget_remaining() -> bool {
CURRENT.with(|cell| cell.get().has_remaining())
}

cfg_rt_multi_thread! {
/// Sets the current task's budget.
pub(crate) fn set(budget: Budget) {
CURRENT.with(|cell| cell.set(budget))
}

#[inline(always)]
pub(crate) fn has_budget_remaining() -> bool {
CURRENT.with(|cell| cell.get().has_remaining())
}
}

cfg_rt! {
Expand Down
27 changes: 23 additions & 4 deletions tokio/src/time/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use crate::{
time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
util::trace,
coop
};

use pin_project_lite::pin_project;
Expand Down Expand Up @@ -169,15 +170,33 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let me = self.project();

let had_budget_before = coop::has_budget_remaining();

// First, try polling the future
if let Poll::Ready(v) = me.value.poll(cx) {
return Poll::Ready(Ok(v));
}

// Now check the timer
match me.delay.poll(cx) {
Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())),
Poll::Pending => Poll::Pending,
let has_budget_now = coop::has_budget_remaining();

let delay = me.delay;

let poll_delay = || -> Poll<Self::Output> {
match delay.poll(cx) {
Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())),
Poll::Pending => Poll::Pending,
}
};

if let (true, false) = (had_budget_before, has_budget_now) {
// if it is the underlying future that exhausted the budget, we poll
// the `delay` with an unconstrained one. This prevents pathological
// cases where the underlying future always exhausts the budget and
// we never get a chance to evaluate whether the timeout was hit or
// not.
coop::with_unconstrained(poll_delay)
} else {
poll_delay()
}
}
}
13 changes: 13 additions & 0 deletions tokio/tests/time_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,16 @@ async fn deadline_future_elapses() {
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

#[tokio::test]
async fn timeout_is_not_exhausted_by_future() {
assert!(
timeout(ms(1), async {
let mut buffer = [0u8; 1];
loop {
use tokio::io::AsyncReadExt;
let _ = tokio::io::empty().read(&mut buffer).await;
}
}).await.is_err()
);
}

0 comments on commit 2151b1e

Please sign in to comment.