diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 05b2ae83bdd..16d93fb7516 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -1,55 +1,33 @@ #![cfg_attr(not(feature = "full"), allow(dead_code))] -//! Opt-in yield points for improved cooperative scheduling. +//! Yield points for improved cooperative scheduling. //! -//! A single call to [`poll`] on a top-level task may potentially do a lot of -//! work before it returns `Poll::Pending`. If a task runs for a long period of -//! time without yielding back to the executor, it can starve other tasks -//! waiting on that executor to execute them, or drive underlying resources. -//! Since Rust does not have a runtime, it is difficult to forcibly preempt a -//! long-running task. Instead, this module provides an opt-in mechanism for -//! futures to collaborate with the executor to avoid starvation. +//! Documentation for this can be found in the [`tokio::task`] module. //! -//! Consider a future like this one: -//! -//! ``` -//! # use tokio_stream::{Stream, StreamExt}; -//! async fn drop_all(mut input: I) { -//! while let Some(_) = input.next().await {} -//! } -//! ``` -//! -//! It may look harmless, but consider what happens under heavy load if the -//! input stream is _always_ ready. If we spawn `drop_all`, the task will never -//! yield, and will starve other tasks and resources on the same executor. With -//! opt-in yield points, this problem is alleviated: -//! -//! ```ignore -//! # use tokio_stream::{Stream, StreamExt}; -//! async fn drop_all(mut input: I) { -//! while let Some(_) = input.next().await { -//! tokio::coop::proceed().await; -//! } -//! } -//! ``` -//! -//! The `proceed` future will coordinate with the executor to make sure that -//! every so often control is yielded back to the executor so it can run other -//! tasks. -//! -//! # Placing yield points -//! -//! Voluntary yield points should be placed _after_ at least some work has been -//! done. If they are not, a future sufficiently deep in the task hierarchy may -//! end up _never_ getting to run because of the number of yield points that -//! inevitably appear before it is reached. In general, you will want yield -//! points to only appear in "leaf" futures -- those that do not themselves poll -//! other futures. By doing this, you avoid double-counting each iteration of -//! the outer future against the cooperating budget. -//! -//! [`poll`]: method@std::future::Future::poll - -// NOTE: The doctests in this module are ignored since the whole module is (currently) private. +//! [`tokio::task`]: crate::task. + +// ```ignore +// # use tokio_stream::{Stream, StreamExt}; +// async fn drop_all(mut input: I) { +// while let Some(_) = input.next().await { +// tokio::coop::proceed().await; +// } +// } +// ``` +// +// The `proceed` future will coordinate with the executor to make sure that +// every so often control is yielded back to the executor so it can run other +// tasks. +// +// # Placing yield points +// +// Voluntary yield points should be placed _after_ at least some work has been +// done. If they are not, a future sufficiently deep in the task hierarchy may +// end up _never_ getting to run because of the number of yield points that +// inevitably appear before it is reached. In general, you will want yield +// points to only appear in "leaf" futures -- those that do not themselves poll +// other futures. By doing this, you avoid double-counting each iteration of +// the outer future against the cooperating budget. use std::cell::Cell; @@ -98,6 +76,13 @@ pub(crate) fn budget(f: impl FnOnce() -> R) -> R { with_budget(Budget::initial(), f) } +/// Run the given closure with an unconstrained task budget. When the function returns, the budget +/// is reset to the value prior to calling the function. +#[inline(always)] +pub(crate) fn with_unconstrained(f: impl FnOnce() -> R) -> R { + with_budget(Budget::unconstrained(), f) +} + #[inline(always)] fn with_budget(budget: Budget, f: impl FnOnce() -> R) -> R { struct ResetGuard<'a> { diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 9ae098fb072..3442612938b 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -357,3 +357,21 @@ macro_rules! cfg_coop { )* } } + +macro_rules! cfg_not_coop { + ($($item:item)*) => { + $( + #[cfg(not(any( + feature = "fs", + feature = "io-std", + feature = "net", + feature = "process", + feature = "rt", + feature = "signal", + feature = "sync", + feature = "time", + )))] + $item + )* + } +} diff --git a/tokio/src/task/mod.rs b/tokio/src/task/mod.rs index 5dc5e72c01e..abae818b62d 100644 --- a/tokio/src/task/mod.rs +++ b/tokio/src/task/mod.rs @@ -209,11 +209,66 @@ //! # } //! ``` //! +//! ### Cooperative scheduling +//! +//! A single call to [`poll`] on a top-level task may potentially do a lot of +//! work before it returns `Poll::Pending`. If a task runs for a long period of +//! time without yielding back to the executor, it can starve other tasks +//! waiting on that executor to execute them, or drive underlying resources. +//! Since Rust does not have a runtime, it is difficult to forcibly preempt a +//! long-running task. Instead, this module provides an opt-in mechanism for +//! futures to collaborate with the executor to avoid starvation. +//! +//! Consider a future like this one: +//! +//! ``` +//! # use tokio_stream::{Stream, StreamExt}; +//! async fn drop_all(mut input: I) { +//! while let Some(_) = input.next().await {} +//! } +//! ``` +//! +//! It may look harmless, but consider what happens under heavy load if the +//! input stream is _always_ ready. If we spawn `drop_all`, the task will never +//! yield, and will starve other tasks and resources on the same executor. +//! +//! To account for this, Tokio has explicit yield points in a number of library +//! functions, which force tasks to return to the executor periodically. +//! +//! +//! #### unconstrained +//! +//! If necessary, [`task::unconstrained`] lets you opt out a future of Tokio's cooperative +//! scheduling. When a future is wrapped with `unconstrained`, it will never be forced to yield to +//! Tokio. For example: +//! +//! ``` +//! # #[tokio::main] +//! # async fn main() { +//! use tokio::{task, sync::mpsc}; +//! +//! let fut = async { +//! let (tx, mut rx) = mpsc::unbounded_channel(); +//! +//! for i in 0..1000 { +//! let _ = tx.send(()); +//! // This will always be ready. If coop was in effect, this code would be forced to yield +//! // periodically. However, if left unconstrained, then this code will never yield. +//! rx.recv().await; +//! } +//! }; +//! +//! task::unconstrained(fut).await; +//! # } +//! ``` +//! //! [`task::spawn_blocking`]: crate::task::spawn_blocking //! [`task::block_in_place`]: crate::task::block_in_place //! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler //! [`task::yield_now`]: crate::task::yield_now() //! [`thread::yield_now`]: std::thread::yield_now +//! [`task::unconstrained`]: crate::task::unconstrained() +//! [`poll`]: method@std::future::Future::poll cfg_rt! { pub use crate::runtime::task::{JoinError, JoinHandle}; @@ -236,4 +291,7 @@ cfg_rt! { mod task_local; pub use task_local::LocalKey; + + mod unconstrained; + pub use unconstrained::{unconstrained, Unconstrained}; } diff --git a/tokio/src/task/unconstrained.rs b/tokio/src/task/unconstrained.rs new file mode 100644 index 00000000000..4a62f819ffa --- /dev/null +++ b/tokio/src/task/unconstrained.rs @@ -0,0 +1,43 @@ +use pin_project_lite::pin_project; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Future for the [`unconstrained`](unconstrained) method. + #[must_use = "Unconstrained does nothing unless polled"] + pub struct Unconstrained { + #[pin] + inner: F, + } +} + +impl Future for Unconstrained +where + F: Future, +{ + type Output = ::Output; + + cfg_coop! { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.project().inner; + crate::coop::with_unconstrained(|| inner.poll(cx)) + } + } + + cfg_not_coop! { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.project().inner; + inner.poll(cx) + } + } +} + +/// Turn off cooperative scheduling for a future. The future will never be forced to yield by +/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields +/// otherwise. +/// +/// See also the usage example in the [task module](index.html#unconstrained). +pub fn unconstrained(inner: F) -> Unconstrained { + Unconstrained { inner } +} diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 9aef4b9b909..cb1d0f66152 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -1017,6 +1017,32 @@ rt_test! { }); } + #[test] + fn coop_unconstrained() { + use std::task::Poll::Ready; + + let rt = rt(); + + rt.block_on(async { + // Create a bunch of tasks + let mut tasks = (0..1_000).map(|_| { + tokio::spawn(async { }) + }).collect::>(); + + // Hope that all the tasks complete... + time::sleep(Duration::from_millis(100)).await; + + tokio::task::unconstrained(poll_fn(|cx| { + // All the tasks should be ready + for task in &mut tasks { + assert!(Pin::new(task).poll(cx).is_ready()); + } + + Ready(()) + })).await; + }); + } + // Tests that the "next task" scheduler optimization is not able to starve // other tasks. #[test]