From 114d8450d57a9401c79c718a1684f422c735620b Mon Sep 17 00:00:00 2001 From: Piotr Sarna Date: Tue, 15 Feb 2022 10:58:05 +0100 Subject: [PATCH] task: add consume_budget for cooperative scheduling For cpu-only computations that do not use any Tokio resources, budgeting does not really kick in in order to yield and prevent other tasks from starvation. The new mechanism - consume_budget, performs a budget check, consumes a unit of it, and yields only if the task exceeded the budget. That allows cpu-intenstive computations to define points in the program which indicate that some significant work was performed. It will yield only if the budget is gone, which is a much better alternative to unconditional yielding, which is a potentially heavy operation. --- tokio/src/task/consume_budget.rs | 45 ++++++++++++++++++++++++++++++++ tokio/src/task/mod.rs | 5 ++++ 2 files changed, 50 insertions(+) create mode 100644 tokio/src/task/consume_budget.rs diff --git a/tokio/src/task/consume_budget.rs b/tokio/src/task/consume_budget.rs new file mode 100644 index 00000000000..8b48d72ca4d --- /dev/null +++ b/tokio/src/task/consume_budget.rs @@ -0,0 +1,45 @@ +use std::task::Poll; + +/// Consumes a unit of budget and returns the execution back to the Tokio +/// runtime *if* the task's coop budget was exhausted. +/// +/// The task will only yield if its entire coop budget has been exhausted. +/// This function can can be used in order to insert optional yield points into long +/// computations that do not use Tokio resources like sockets or semaphores, +/// without redundantly yielding to the runtime each time. +/// +/// **Note**: This is an [unstable API][unstable]. The public API of this type +/// may break in 1.x releases. See [the documentation on unstable +/// features][unstable] for details. +/// +/// # Examples +/// +/// Make sure that a function which returns a sum of (potentially lots of) +/// iterated values is cooperative. +/// +/// ``` +/// async fn sum_iterator(input: &mut impl std::iter::Iterator) -> i64 { +/// let mut sum: i64 = 0; +/// while let Some(i) = input.next() { +/// sum += i; +/// tokio::task::consume_budget().await +/// } +/// sum +/// } +/// ``` +/// [unstable]: crate#unstable-features +#[cfg_attr(docsrs, doc(cfg(feature = "rt")))] +pub async fn consume_budget() { + let mut status = Poll::Pending; + + crate::future::poll_fn(move |cx| { + if status.is_ready() { + return status; + } + status = crate::coop::poll_proceed(cx).map(|restore| { + restore.made_progress(); + }); + status + }) + .await +} diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index cebc269bb40..4057d535218 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -291,6 +291,11 @@ cfg_rt! { mod yield_now; pub use yield_now::yield_now; + cfg_unstable! { + mod consume_budget; + pub use consume_budget::consume_budget; + } + mod local; pub use local::{spawn_local, LocalSet};