diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 05b2ae83bdd..e216e9a39ac 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -1,6 +1,6 @@ #![cfg_attr(not(feature = "full"), allow(dead_code))] -//! Opt-in yield points for improved cooperative scheduling. +//! Yield points for improved cooperative scheduling. //! //! A single call to [`poll`] on a top-level task may potentially do a lot of //! work before it returns `Poll::Pending`. If a task runs for a long period of @@ -21,37 +21,18 @@ //! //! It may look harmless, but consider what happens under heavy load if the //! input stream is _always_ ready. If we spawn `drop_all`, the task will never -//! yield, and will starve other tasks and resources on the same executor. With -//! opt-in yield points, this problem is alleviated: +//! yield, and will starve other tasks and resources on the same executor. //! -//! ```ignore -//! # use tokio_stream::{Stream, StreamExt}; -//! async fn drop_all(mut input: I) { -//! while let Some(_) = input.next().await { -//! tokio::coop::proceed().await; -//! } -//! } -//! ``` -//! -//! The `proceed` future will coordinate with the executor to make sure that -//! every so often control is yielded back to the executor so it can run other -//! tasks. -//! -//! # Placing yield points -//! -//! Voluntary yield points should be placed _after_ at least some work has been -//! done. If they are not, a future sufficiently deep in the task hierarchy may -//! end up _never_ getting to run because of the number of yield points that -//! inevitably appear before it is reached. In general, you will want yield -//! points to only appear in "leaf" futures -- those that do not themselves poll -//! other futures. By doing this, you avoid double-counting each iteration of -//! the outer future against the cooperating budget. +//! To account for this, Tokio has explicit yield points in a number of library +//! functions, which force tasks to retunr to the executor periodically. //! //! [`poll`]: method@std::future::Future::poll -// NOTE: The doctests in this module are ignored since the whole module is (currently) private. - +use pin_project_lite::pin_project; use std::cell::Cell; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; thread_local! { static CURRENT: Cell = Cell::new(Budget::unconstrained()); @@ -147,9 +128,45 @@ cfg_rt! { } } -cfg_coop! { - use std::task::{Context, Poll}; +pin_project! { + /// Future for the [`unconstrained`](unconstrained) method. + #[must_use = "Unconstrained does nothing unless polled"] + pub struct Unconstrained { + #[pin] + inner: F, + } +} + +impl Future for Unconstrained +where + F: Future, +{ + type Output = ::Output; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let inner = self.project().inner; + with_budget(Budget::unconstrained(), || inner.poll(cx)) + } +} + +/// Turn off cooperative scheduling for a future. The future or stream will never yield. +/// +/// # Examples +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() { +/// use tokio::coop; +/// +/// let fut = async { () }; +/// coop::unconstrained(fut).await; +/// # } +/// ``` +pub fn unconstrained(inner: F) -> Unconstrained { + Unconstrained { inner } +} + +cfg_coop! { #[must_use] pub(crate) struct RestoreOnPending(Cell); diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index f6c502e56c0..eec13d86df8 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -372,7 +372,7 @@ cfg_rt! { pub mod runtime; } -pub(crate) mod coop; +pub mod coop; cfg_signal! { pub mod signal; diff --git a/tokio/tests/rt_common.rs b/tokio/tests/rt_common.rs index 9aef4b9b909..7e0e5b41404 100644 --- a/tokio/tests/rt_common.rs +++ b/tokio/tests/rt_common.rs @@ -1017,6 +1017,32 @@ rt_test! { }); } + #[test] + fn coop_unconstrained() { + use std::task::Poll::Ready; + + let rt = rt(); + + rt.block_on(async { + // Create a bunch of tasks + let mut tasks = (0..1_000).map(|_| { + tokio::spawn(async { }) + }).collect::>(); + + // Hope that all the tasks complete... + time::sleep(Duration::from_millis(100)).await; + + tokio::coop::unconstrained(poll_fn(|cx| { + // All the tasks should be ready + for task in &mut tasks { + assert!(Pin::new(task).poll(cx).is_ready()); + } + + Ready(()) + })).await; + }); + } + // Tests that the "next task" scheduler optimization is not able to starve // other tasks. #[test]