Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make timeout robust against budget-depleting tasks #4314

Merged
merged 2 commits into from Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 7 additions & 11 deletions tokio/src/coop.rs
Expand Up @@ -59,13 +59,9 @@ impl Budget {
const fn unconstrained() -> Budget {
Budget(None)
}
}

cfg_rt_multi_thread! {
impl Budget {
fn has_remaining(self) -> bool {
self.0.map(|budget| budget > 0).unwrap_or(true)
}
fn has_remaining(self) -> bool {
self.0.map(|budget| budget > 0).unwrap_or(true)
}
}

Expand Down Expand Up @@ -107,16 +103,16 @@ fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
})
}

#[inline(always)]
pub(crate) fn has_budget_remaining() -> bool {
CURRENT.with(|cell| cell.get().has_remaining())
}

cfg_rt_multi_thread! {
/// Sets the current task's budget.
pub(crate) fn set(budget: Budget) {
CURRENT.with(|cell| cell.set(budget))
}

#[inline(always)]
pub(crate) fn has_budget_remaining() -> bool {
CURRENT.with(|cell| cell.get().has_remaining())
}
}

cfg_rt! {
Expand Down
27 changes: 23 additions & 4 deletions tokio/src/time/timeout.rs
Expand Up @@ -5,6 +5,7 @@
//! [`Timeout`]: struct@Timeout

use crate::{
coop,
time::{error::Elapsed, sleep_until, Duration, Instant, Sleep},
util::trace,
};
Expand Down Expand Up @@ -169,15 +170,33 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let me = self.project();

let had_budget_before = coop::has_budget_remaining();

// First, try polling the future
if let Poll::Ready(v) = me.value.poll(cx) {
return Poll::Ready(Ok(v));
}

// Now check the timer
match me.delay.poll(cx) {
Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())),
Poll::Pending => Poll::Pending,
let has_budget_now = coop::has_budget_remaining();

let delay = me.delay;

let poll_delay = || -> Poll<Self::Output> {
match delay.poll(cx) {
Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())),
Poll::Pending => Poll::Pending,
}
};

if let (true, false) = (had_budget_before, has_budget_now) {
// if it is the underlying future that exhausted the budget, we poll
// the `delay` with an unconstrained one. This prevents pathological
// cases where the underlying future always exhausts the budget and
// we never get a chance to evaluate whether the timeout was hit or
// not.
coop::with_unconstrained(poll_delay)
} else {
poll_delay()
}
}
}
13 changes: 13 additions & 0 deletions tokio/tests/time_timeout.rs
Expand Up @@ -135,3 +135,16 @@ async fn deadline_future_elapses() {
fn ms(n: u64) -> Duration {
Duration::from_millis(n)
}

#[tokio::test]
async fn timeout_is_not_exhausted_by_future() {
let fut = timeout(ms(1), async {
let mut buffer = [0u8; 1];
loop {
use tokio::io::AsyncReadExt;
let _ = tokio::io::empty().read(&mut buffer).await;
}
});

assert!(fut.await.is_err());
}