From 180ecce7516f44d7383630e12d0b7f7e88ba6e14 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Sat, 11 Dec 2021 13:06:20 +0000 Subject: [PATCH 1/2] make timeout robust against budget-depleting tasks Up until now, if the future that we were applying a timeout to consistently depleted the coop budget, the timeout never got a chance to be evaluated. In the next call to `poll`, the underlying future would be polled and it would once again deplete the budget. In those circumstances, timeouts would not be respected. This can be surprising to people, and in fact it was in #4291 . The solution is to make a budget exception with `timeout` if it was the underlying future that depleted the budget. Refs: #4291 , #4300 --- tokio/src/coop.rs | 19 ++++++++----------- tokio/src/time/timeout.rs | 27 +++++++++++++++++++++++---- tokio/tests/time_timeout.rs | 13 +++++++++++++ 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 256e9620e75..fd447dd5a41 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -59,13 +59,10 @@ impl Budget { const fn unconstrained() -> Budget { Budget(None) } -} -cfg_rt_multi_thread! { - impl Budget { - fn has_remaining(self) -> bool { - self.0.map(|budget| budget > 0).unwrap_or(true) - } + + fn has_remaining(self) -> bool { + self.0.map(|budget| budget > 0).unwrap_or(true) } } @@ -107,16 +104,16 @@ fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { }) } +#[inline(always)] +pub(crate) fn has_budget_remaining() -> bool { + CURRENT.with(|cell| cell.get().has_remaining()) +} + cfg_rt_multi_thread! { /// Sets the current task's budget. pub(crate) fn set(budget: Budget) { CURRENT.with(|cell| cell.set(budget)) } - - #[inline(always)] - pub(crate) fn has_budget_remaining() -> bool { - CURRENT.with(|cell| cell.get().has_remaining()) - } } cfg_rt! { diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index cc299161633..ecf4eb3d8b5 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -7,6 +7,7 @@ use crate::{ time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, util::trace, + coop }; use pin_project_lite::pin_project; @@ -169,15 +170,33 @@ where fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { let me = self.project(); + let had_budget_before = coop::has_budget_remaining(); + // First, try polling the future if let Poll::Ready(v) = me.value.poll(cx) { return Poll::Ready(Ok(v)); } - // Now check the timer - match me.delay.poll(cx) { - Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), - Poll::Pending => Poll::Pending, + let has_budget_now = coop::has_budget_remaining(); + + let delay = me.delay; + + let poll_delay = || -> Poll { + match delay.poll(cx) { + Poll::Ready(()) => Poll::Ready(Err(Elapsed::new())), + Poll::Pending => Poll::Pending, + } + }; + + if let (true, false) = (had_budget_before, has_budget_now) { + // if it is the underlying future that exhausted the budget, we poll + // the `delay` with an unconstrained one. This prevents pathological + // cases where the underlying future always exhausts the budget and + // we never get a chance to evaluate whether the timeout was hit or + // not. + coop::with_unconstrained(poll_delay) + } else { + poll_delay() } } } diff --git a/tokio/tests/time_timeout.rs b/tokio/tests/time_timeout.rs index dbd80eb8a6a..481e089df33 100644 --- a/tokio/tests/time_timeout.rs +++ b/tokio/tests/time_timeout.rs @@ -135,3 +135,16 @@ async fn deadline_future_elapses() { fn ms(n: u64) -> Duration { Duration::from_millis(n) } + +#[tokio::test] +async fn timeout_is_not_exhausted_by_future() { + assert!( + timeout(ms(1), async { + let mut buffer = [0u8; 1]; + loop { + use tokio::io::AsyncReadExt; + let _ = tokio::io::empty().read(&mut buffer).await; + } + }).await.is_err() + ); +} From b3a53005d65bd79020f68176defc7228fc04b5e3 Mon Sep 17 00:00:00 2001 From: Braulio Valdivielso Date: Sat, 11 Dec 2021 13:26:06 +0000 Subject: [PATCH 2/2] fix style issues --- tokio/src/coop.rs | 1 - tokio/src/time/timeout.rs | 2 +- tokio/tests/time_timeout.rs | 18 +++++++++--------- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index fd447dd5a41..145e703971b 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -60,7 +60,6 @@ impl Budget { Budget(None) } - fn has_remaining(self) -> bool { self.0.map(|budget| budget > 0).unwrap_or(true) } diff --git a/tokio/src/time/timeout.rs b/tokio/src/time/timeout.rs index ecf4eb3d8b5..4a93089e8e8 100644 --- a/tokio/src/time/timeout.rs +++ b/tokio/src/time/timeout.rs @@ -5,9 +5,9 @@ //! [`Timeout`]: struct@Timeout use crate::{ + coop, time::{error::Elapsed, sleep_until, Duration, Instant, Sleep}, util::trace, - coop }; use pin_project_lite::pin_project; diff --git a/tokio/tests/time_timeout.rs b/tokio/tests/time_timeout.rs index 481e089df33..a1ff51e7d27 100644 --- a/tokio/tests/time_timeout.rs +++ b/tokio/tests/time_timeout.rs @@ -138,13 +138,13 @@ fn ms(n: u64) -> Duration { #[tokio::test] async fn timeout_is_not_exhausted_by_future() { - assert!( - timeout(ms(1), async { - let mut buffer = [0u8; 1]; - loop { - use tokio::io::AsyncReadExt; - let _ = tokio::io::empty().read(&mut buffer).await; - } - }).await.is_err() - ); + let fut = timeout(ms(1), async { + let mut buffer = [0u8; 1]; + loop { + use tokio::io::AsyncReadExt; + let _ = tokio::io::empty().read(&mut buffer).await; + } + }); + + assert!(fut.await.is_err()); }