Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coop: expose an unconstrained() opt-out #3547

Merged
merged 5 commits into from Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 35 additions & 25 deletions tokio/src/coop.rs
@@ -1,6 +1,6 @@
#![cfg_attr(not(feature = "full"), allow(dead_code))]

//! Opt-in yield points for improved cooperative scheduling.
//! Yield points for improved cooperative scheduling.
//!
//! A single call to [`poll`] on a top-level task may potentially do a lot of
//! work before it returns `Poll::Pending`. If a task runs for a long period of
Expand All @@ -21,35 +21,38 @@
//!
//! It may look harmless, but consider what happens under heavy load if the
//! input stream is _always_ ready. If we spawn `drop_all`, the task will never
//! yield, and will starve other tasks and resources on the same executor. With
//! opt-in yield points, this problem is alleviated:
//! yield, and will starve other tasks and resources on the same executor.
//!
//! ```ignore
krallin marked this conversation as resolved.
Show resolved Hide resolved
//! # use tokio_stream::{Stream, StreamExt};
//! async fn drop_all<I: Stream + Unpin>(mut input: I) {
//! while let Some(_) = input.next().await {
//! tokio::coop::proceed().await;
//! }
//! }
//! ```
//!
//! The `proceed` future will coordinate with the executor to make sure that
//! every so often control is yielded back to the executor so it can run other
//! tasks.
//! To account for this, Tokio has explicit yield points in a number of library
//! functions, which force tasks to return to the executor periodically.
//!
//! # Placing yield points
//!
//! Voluntary yield points should be placed _after_ at least some work has been
//! done. If they are not, a future sufficiently deep in the task hierarchy may
//! end up _never_ getting to run because of the number of yield points that
//! inevitably appear before it is reached. In general, you will want yield
//! points to only appear in "leaf" futures -- those that do not themselves poll
//! other futures. By doing this, you avoid double-counting each iteration of
//! the outer future against the cooperating budget.
//! If necessary, you may use [`task::unconstrained`][crate::task::unconstrained] to opt out
//! specific futures of Tokio's cooperative scheduling.
//!
//! [`poll`]: method@std::future::Future::poll

// NOTE: The doctests in this module are ignored since the whole module is (currently) private.
// ```ignore
// # use tokio_stream::{Stream, StreamExt};
// async fn drop_all<I: Stream + Unpin>(mut input: I) {
// while let Some(_) = input.next().await {
// tokio::coop::proceed().await;
// }
// }
// ```
//
// The `proceed` future will coordinate with the executor to make sure that
// every so often control is yielded back to the executor so it can run other
// tasks.
//
// # Placing yield points
//
// Voluntary yield points should be placed _after_ at least some work has been
// done. If they are not, a future sufficiently deep in the task hierarchy may
// end up _never_ getting to run because of the number of yield points that
// inevitably appear before it is reached. In general, you will want yield
// points to only appear in "leaf" futures -- those that do not themselves poll
// other futures. By doing this, you avoid double-counting each iteration of
// the outer future against the cooperating budget.

use std::cell::Cell;

Expand Down Expand Up @@ -98,6 +101,13 @@ pub(crate) fn budget<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::initial(), f)
}

/// Run the given closure with an unconstrained task budget. When the function returns, the budget
/// is reset to the value prior to calling the function.
#[inline(always)]
pub(crate) fn with_unconstrained<R>(f: impl FnOnce() -> R) -> R {
with_budget(Budget::unconstrained(), f)
}

#[inline(always)]
fn with_budget<R>(budget: Budget, f: impl FnOnce() -> R) -> R {
struct ResetGuard<'a> {
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/lib.rs
Expand Up @@ -372,7 +372,7 @@ cfg_rt! {
pub mod runtime;
}

pub(crate) mod coop;
pub mod coop;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we moved the function to tokio::task, this shouldn't be made public.

Copy link
Contributor Author

@krallin krallin Mar 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept it public so we could have access to some documentation for the coop module. Otherwise, there isn't much to explain what the wrapper does, and documenting the coop mechanism in the wrapper to disable it seems a bit counter-intuitive.

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, good point. I think I would still prefer to put it on the module docs of an existing module.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind — where should it live then?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should probably go on tokio::task or tokio::runtime is my thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this into tokio::task and made tokio::coop pub(crate) again.


cfg_signal! {
pub mod signal;
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/macros/cfg.rs
Expand Up @@ -357,3 +357,21 @@ macro_rules! cfg_coop {
)*
}
}

macro_rules! cfg_not_coop {
($($item:item)*) => {
$(
#[cfg(not(any(
feature = "fs",
feature = "io-std",
feature = "net",
feature = "process",
feature = "rt",
feature = "signal",
feature = "sync",
feature = "time",
)))]
$item
)*
}
}
29 changes: 29 additions & 0 deletions tokio/src/task/mod.rs
Expand Up @@ -208,12 +208,38 @@
//! # .await;
//! # }
//! ```
//! #### unconstrained
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love this being lowercase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pattern-matched from the other functions in there, e.g. yield_now:

tokio/tokio/src/task/mod.rs

Lines 185 to 192 in 704de8c

//! #### yield_now
//!
//! In addition, this module provides a [`task::yield_now`] async function
//! that is analogous to the standard library's [`thread::yield_now`]. Calling
//! and `await`ing this function will cause the current task to yield to the
//! Tokio runtime's scheduler, allowing other tasks to be
//! scheduled. Eventually, the yielding task will be polled again, allowing it
//! to execute. For example:

Should we change them all?

//!
//! Finally, this module provides [`task::unconstrained`], which lets you opt out a future of
//! tokio's [cooperative scheduling][crate::coop]. When a future is crapped with `unconstrained`,
//! it will never be forced to yield to Tokio. For example:
//!
//! ```
//! # #[tokio::main]
//! # async fn main() {
//! use tokio::{task, sync::mpsc};
//!
//! let fut = async {
//! let (tx, mut rx) = mpsc::unbounded_channel();
//!
//! for i in 0..1000 {
//! let _ = tx.send(());
//! // This will always be ready. If coop was in effect, this code would be forced to yield
//! // periodically. However, if left unconstrained, then this code will never yield.
//! rx.recv().await;
//! }
//! };
//!
//! task::unconstrained(fut).await;
//! # }
//! ```
//!
//! [`task::spawn_blocking`]: crate::task::spawn_blocking
//! [`task::block_in_place`]: crate::task::block_in_place
//! [rt-multi-thread]: ../runtime/index.html#threaded-scheduler
//! [`task::yield_now`]: crate::task::yield_now()
//! [`thread::yield_now`]: std::thread::yield_now
//! [`task::unconstrained`]: crate::task::unconstrained()

cfg_rt! {
pub use crate::runtime::task::{JoinError, JoinHandle};
Expand All @@ -236,4 +262,7 @@ cfg_rt! {

mod task_local;
pub use task_local::LocalKey;

mod unconstrained;
pub use unconstrained::{unconstrained, Unconstrained};
}
43 changes: 43 additions & 0 deletions tokio/src/task/unconstrained.rs
@@ -0,0 +1,43 @@
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

pin_project! {
/// Future for the [`unconstrained`](unconstrained) method.
#[must_use = "Unconstrained does nothing unless polled"]
pub struct Unconstrained<F> {
#[pin]
inner: F,
}
}

impl<F> Future for Unconstrained<F>
where
F: Future,
{
type Output = <F as Future>::Output;

cfg_coop! {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
crate::coop::with_unconstrained(|| inner.poll(cx))
}
}

cfg_not_coop! {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self.project().inner;
inner.poll(cx)
}
}
}

/// Turn off cooperative scheduling for a future. The future will never be forced to yield by
/// Tokio. Using this exposes your service to starvation if the unconstrained future never yields
/// otherwise.
///
/// See also the usage example in the [task module](index.html#unconstrained).
pub fn unconstrained<F>(inner: F) -> Unconstrained<F> {
Unconstrained { inner }
}
26 changes: 26 additions & 0 deletions tokio/tests/rt_common.rs
Expand Up @@ -1017,6 +1017,32 @@ rt_test! {
});
}

#[test]
fn coop_unconstrained() {
use std::task::Poll::Ready;

let rt = rt();

rt.block_on(async {
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
// Create a bunch of tasks
let mut tasks = (0..1_000).map(|_| {
tokio::spawn(async { })
}).collect::<Vec<_>>();

// Hope that all the tasks complete...
time::sleep(Duration::from_millis(100)).await;

tokio::task::unconstrained(poll_fn(|cx| {
// All the tasks should be ready
for task in &mut tasks {
assert!(Pin::new(task).poll(cx).is_ready());
}

Ready(())
})).await;
});
}

// Tests that the "next task" scheduler optimization is not able to starve
// other tasks.
#[test]
Expand Down