Skip to content

Commit

Permalink
macros: join! gives each task its own budget to avoid starvation.
Browse files Browse the repository at this point in the history
Fixes: #4612
  • Loading branch information
PoorlyDefinedBehaviour committed Apr 17, 2022
1 parent c43832a commit ea44be5
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 5 deletions.
2 changes: 1 addition & 1 deletion tokio/src/coop.rs
Expand Up @@ -68,7 +68,7 @@ impl Budget {
/// Runs the given closure with a cooperative task budget. When the function
/// returns, the budget is reset to the value prior to calling the function.
#[inline(always)]
pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
pub fn budget<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::initial(), f)
}

Expand Down
7 changes: 6 additions & 1 deletion tokio/src/lib.rs
Expand Up @@ -423,7 +423,12 @@ cfg_rt! {
pub mod runtime;
}

pub(crate) mod coop;
// Includes the `budget` function which is used by the `join` macros.
//
// This module is not intended to be part of the public API. In general, any
// `doc(hidden)` code is not part of Tokio's public and stable API.
#[doc(hidden)]
pub mod coop;

cfg_signal! {
pub mod signal;
Expand Down
7 changes: 5 additions & 2 deletions tokio/src/macros/join.rs
Expand Up @@ -82,8 +82,11 @@ macro_rules! join {
// and never moved.
let mut fut = unsafe { Pin::new_unchecked(fut) };

// Try polling
if fut.poll(cx).is_pending() {
// Try polling.
// Give each future its own budget to avoid starvation in the case
// where some of the futures consume a resource that is always ready.
let poll = $crate::coop::budget(|| { fut.poll(cx) });
if poll.is_pending() {
is_pending = true;
}
)*
Expand Down
41 changes: 40 additions & 1 deletion tokio/tests/macros_join.rs
@@ -1,6 +1,8 @@
#![cfg(feature = "macros")]
#![allow(clippy::blacklisted_name)]

use std::{sync::Arc, time::Duration};

#[cfg(target_arch = "wasm32")]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[cfg(target_arch = "wasm32")]
Expand All @@ -9,7 +11,7 @@ use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test;
#[cfg(not(target_arch = "wasm32"))]
use tokio::test as maybe_tokio_test;

use tokio::sync::oneshot;
use tokio::sync::{oneshot, Semaphore};
use tokio_test::{assert_pending, assert_ready, task};

#[maybe_tokio_test]
Expand Down Expand Up @@ -80,3 +82,40 @@ fn join_size() {
};
assert_eq!(mem::size_of_val(&fut), 28);
}

async fn non_cooperative_task(permits: Arc<Semaphore>) -> usize {
let mut exceeded_budget = 0;

for _ in 0..5 {
// Another task should run after after this task uses its whole budget
for _ in 0..128 {
let _permit = permits.clone().acquire_owned().await.unwrap();
}

exceeded_budget += 1;
}

exceeded_budget
}

async fn poor_little_task() -> usize {
let mut how_many_times_i_got_to_run = 0;

for _ in 0..5 {
tokio::time::sleep(Duration::from_millis(100)).await;
how_many_times_i_got_to_run += 1;
}

how_many_times_i_got_to_run
}

#[tokio::test]
async fn join_does_not_allow_tasks_to_starve() {
let permits = Arc::new(Semaphore::new(10));

// non_cooperative_task should yield after its budget is exceeded and then poor_little_task should run.
let (non_cooperative_result, little_task_result) = tokio::join!(non_cooperative_task(permits), poor_little_task());

assert_eq!(5, non_cooperative_result);
assert_eq!(5, little_task_result);
}

0 comments on commit ea44be5

Please sign in to comment.