From 448fa3739896b7703e5ad529d5286cc58debbf52 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Thu, 5 Mar 2020 15:42:44 -0500 Subject: [PATCH] Add cooperative task yielding --- tokio/src/coop.rs | 292 ++++++++++++++++++++++++ tokio/src/io/registration.rs | 6 + tokio/src/lib.rs | 2 + tokio/src/process/mod.rs | 3 + tokio/src/runtime/basic_scheduler.rs | 2 +- tokio/src/runtime/enter.rs | 4 +- tokio/src/runtime/shell.rs | 2 +- tokio/src/runtime/task/core.rs | 2 +- tokio/src/runtime/task/join.rs | 3 + tokio/src/runtime/thread_pool/worker.rs | 7 +- tokio/src/sync/mpsc/chan.rs | 6 + tokio/src/sync/mutex.rs | 19 +- tokio/src/sync/oneshot.rs | 6 + tokio/src/sync/rwlock.rs | 3 + tokio/src/sync/semaphore.rs | 11 +- tokio/src/time/driver/registration.rs | 3 + 16 files changed, 355 insertions(+), 16 deletions(-) create mode 100644 tokio/src/coop.rs diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs new file mode 100644 index 00000000000..8325970ae43 --- /dev/null +++ b/tokio/src/coop.rs @@ -0,0 +1,292 @@ +//! Opt-in yield points for improved cooperative scheduling. +//! +//! A single call to [`poll`] on a top-level task may potentially do a lot of work before it +//! returns `Poll::Pending`. If a task runs for a long period of time without yielding back to the +//! executor, it can starve other tasks waiting on that executor to execute them, or drive +//! underlying resources. Since Rust does not have a runtime, it is difficult to forcibly preempt a +//! long-running task. Instead, this module provides an opt-in mechanism for futures to collaborate +//! with the executor to avoid starvation. +//! +//! Consider a future like this one: +//! +//! ``` +//! # use tokio::stream::{Stream, StreamExt}; +//! async fn drop_all(mut input: I) { +//! while let Some(_) = input.next().await {} +//! } +//! ``` +//! +//! It may look harmless, but consider what happens under heavy load if the input stream is +//! _always_ ready. If we spawn `drop_all`, the task will never yield, and will starve other tasks +//! and resources on the same executor. With opt-in yield points, this problem is alleviated: +//! +//! ```ignore +//! # use tokio::stream::{Stream, StreamExt}; +//! async fn drop_all(mut input: I) { +//! while let Some(_) = input.next().await { +//! tokio::coop::proceed().await; +//! } +//! } +//! ``` +//! +//! The `proceed` future will coordinate with the executor to make sure that every so often control +//! is yielded back to the executor so it can run other tasks. +//! +//! # Placing yield points +//! +//! Voluntary yield points should be placed _after_ at least some work has been done. If they are +//! not, a future sufficiently deep in the task hierarchy may end up _never_ getting to run because +//! of the number of yield points that inevitably appear before it is reached. In general, you will +//! want yield points to only appear in "leaf" futures -- those that do not themselves poll other +//! futures. By doing this, you avoid double-counting each iteration of the outer future against +//! the cooperating budget. +//! +//! [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll + +// NOTE: The doctests in this module are ignored since the whole module is (currently) private. + +use std::cell::Cell; +use std::task::{Context, Poll}; + +/// Constant used to determine how much "work" a task is allowed to do without yielding. +/// +/// The value itself is chosen somewhat arbitrarily. It needs to be high enough to amortize wakeup +/// and scheduling costs, but low enough that we do not starve other tasks for too long. The value +/// also needs to be high enough that particularly deep tasks are able to do at least some useful +/// work at all. +/// +/// Note that as more yield points are added in the ecosystem, this value will probably also have +/// to be raised. +const BUDGET: usize = 128; + +/// Constant used to determine if budgeting has been disabled. +const UNCONSTRAINED: usize = usize::max_value(); + +thread_local! { + static HITS: Cell = Cell::new(UNCONSTRAINED); +} + +/// Run the given closure with a cooperative task budget. +pub(crate) fn budget(f: F) -> R +where + F: FnOnce() -> R, +{ + struct Guard; + impl Drop for Guard { + fn drop(&mut self) { + HITS.with(|hits| { + hits.set(UNCONSTRAINED); + }); + } + } + + let _guard = Guard; + HITS.with(|hits| { + hits.set(BUDGET); + }); + f() +} + +cfg_rt_threaded! { + /// Forcibly remove the budgeting constraints early. + pub(crate) fn stop() { + HITS.with(|hits| { + hits.set(UNCONSTRAINED); + }); + } +} + +/// Invoke `f` with a subset of the remaining budget. +/// +/// This is useful if you have sub-futures that you need to poll, but that you want to restrict +/// from using up your entire budget. For example, imagine the following future: +/// +/// ```rust +/// # use std::{future::Future, pin::Pin, task::{Context, Poll}}; +/// use futures::stream::FuturesUnordered; +/// struct MyFuture { +/// big: FuturesUnordered, +/// small: F2, +/// } +/// +/// use tokio::stream::Stream; +/// impl Future for MyFuture +/// where F1: Future, F2: Future +/// # , F1: Unpin, F2: Unpin +/// { +/// type Output = F2::Output; +/// +/// // fn poll(...) +/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +/// # let this = &mut *self; +/// let mut big = // something to pin self.big +/// # Pin::new(&mut this.big); +/// let small = // something to pin self.small +/// # Pin::new(&mut this.small); +/// +/// // see if any of the big futures have finished +/// while let Some(e) = futures::ready!(big.as_mut().poll_next(cx)) { +/// // do something with e +/// # let _ = e; +/// } +/// +/// // see if the small future has finished +/// small.poll(cx) +/// } +/// # } +/// ``` +/// +/// It could be that every time `poll` gets called, `big` ends up spending the entire budget, and +/// `small` never gets polled. That would be sad. If you want to stick up for the little future, +/// that's what `limit` is for. It lets you portion out a smaller part of the yield budget to a +/// particular segment of your code. In the code above, you would write +/// +/// ```rust,ignore +/// # use std::{future::Future, pin::Pin, task::{Context, Poll}}; +/// # use futures::stream::FuturesUnordered; +/// # struct MyFuture { +/// # big: FuturesUnordered, +/// # small: F2, +/// # } +/// # +/// # use tokio::stream::Stream; +/// # impl Future for MyFuture +/// # where F1: Future, F2: Future +/// # , F1: Unpin, F2: Unpin +/// # { +/// # type Output = F2::Output; +/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +/// # let this = &mut *self; +/// # let mut big = Pin::new(&mut this.big); +/// # let small = Pin::new(&mut this.small); +/// # +/// // see if any of the big futures have finished +/// while let Some(e) = futures::ready!(tokio::coop::limit(64, || big.as_mut().poll_next(cx))) { +/// # // do something with e +/// # let _ = e; +/// # } +/// # small.poll(cx) +/// # } +/// # } +/// ``` +/// +/// Now, even if `big` spends its entire budget, `small` will likely be left with some budget left +/// to also do useful work. In particular, if the remaining budget was `N` at the start of `poll`, +/// `small` will have at least a budget of `N - 64`. It may be more if `big` did not spend its +/// entire budget. +/// +/// Note that you cannot _increase_ your budget by calling `limit`. The budget provided to the code +/// inside the buget is the _minimum_ of the _current_ budget and the bound. +/// +#[allow(unreachable_pub, dead_code)] +pub fn limit(bound: usize, f: impl FnOnce() -> R) -> R { + HITS.with(|hits| { + let budget = hits.get(); + // with_bound cannot _increase_ the remaining budget + let bound = std::cmp::min(budget, bound); + // When f() exits, how much should we add to what is left? + let floor = budget.saturating_sub(bound); + // Make sure we restore the remaining budget even on panic + struct RestoreBudget<'a>(&'a Cell, usize); + impl<'a> Drop for RestoreBudget<'a> { + fn drop(&mut self) { + let left = self.0.get(); + self.0.set(self.1 + left); + } + } + // Time to restrict! + hits.set(bound); + let _restore = RestoreBudget(&hits, floor); + f() + }) +} + +/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. +#[allow(unreachable_pub, dead_code)] +#[inline] +pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> { + HITS.with(|hits| { + let n = hits.get(); + if n == UNCONSTRAINED { + // opted out of budgeting + Poll::Ready(()) + } else if n == 0 { + cx.waker().wake_by_ref(); + Poll::Pending + } else { + hits.set(n.saturating_sub(1)); + Poll::Ready(()) + } + }) +} + +/// Resolves immediately unless the current task has already exceeded its budget. +/// +/// This should be placed after at least some work has been done. Otherwise a future sufficiently +/// deep in the task hierarchy may end up never getting to run because of the number of yield +/// points that inevitably appear before it is even reached. For example: +/// +/// ```ignore +/// # use tokio::stream::{Stream, StreamExt}; +/// async fn drop_all(mut input: I) { +/// while let Some(_) = input.next().await { +/// tokio::coop::proceed().await; +/// } +/// } +/// ``` +#[allow(unreachable_pub, dead_code)] +#[inline] +pub async fn proceed() { + use crate::future::poll_fn; + poll_fn(|cx| poll_proceed(cx)).await; +} + +#[cfg(all(test, not(loom)))] +mod test { + use super::*; + + fn get() -> usize { + HITS.with(|hits| hits.get()) + } + + #[test] + fn bugeting() { + use tokio_test::*; + + assert_eq!(get(), UNCONSTRAINED); + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get(), UNCONSTRAINED); + opt_in(); + assert_eq!(get(), BUDGET); + opt_in(); + assert_eq!(get(), BUDGET); + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get(), BUDGET - 1); + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get(), BUDGET - 2); + executor_tick(); + assert_eq!(get(), BUDGET); + limit(3, || { + assert_eq!(get(), 3); + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get(), 2); + limit(4, || { + assert_eq!(get(), 2); + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get(), 1); + }); + assert_eq!(get(), 1); + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get(), 0); + assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get(), 0); + assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get(), 0); + }); + assert_eq!(get(), BUDGET - 3); + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get(), BUDGET - 4); + assert_ready!(task::spawn(proceed()).poll()); + assert_eq!(get(), BUDGET - 5); + } +} diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index 52d1564beee..4df11999f5e 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -139,6 +139,9 @@ impl Registration { /// /// This function will panic if called from outside of a task context. pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + let v = self.poll_ready(Direction::Read, Some(cx))?; match v { Some(v) => Poll::Ready(Ok(v)), @@ -190,6 +193,9 @@ impl Registration { /// /// This function will panic if called from outside of a task context. pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + let v = self.poll_ready(Direction::Write, Some(cx))?; match v { Some(v) => Poll::Ready(Ok(v)), diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 1498c2b5f00..87a1d0c458b 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -321,6 +321,8 @@ cfg_process! { pub mod runtime; +pub(crate) mod coop; + cfg_signal! { pub mod signal; } diff --git a/tokio/src/process/mod.rs b/tokio/src/process/mod.rs index d704347c234..7231511235e 100644 --- a/tokio/src/process/mod.rs +++ b/tokio/src/process/mod.rs @@ -700,6 +700,9 @@ where type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + let ret = Pin::new(&mut self.inner).poll(cx); if let Poll::Ready(Ok(_)) = ret { diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index a494f9e3d7c..0419c209d8a 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -128,7 +128,7 @@ where pin!(future); 'outer: loop { - if let Ready(v) = future.as_mut().poll(&mut cx) { + if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { return v; } diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index 64648b71dc8..afdb67a3b7c 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -98,7 +98,7 @@ cfg_blocking_impl! { let mut f = unsafe { Pin::new_unchecked(&mut f) }; loop { - if let Ready(v) = f.as_mut().poll(&mut cx) { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { return Ok(v); } @@ -130,7 +130,7 @@ cfg_blocking_impl! { let when = Instant::now() + timeout; loop { - if let Ready(v) = f.as_mut().poll(&mut cx) { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { return Ok(v); } diff --git a/tokio/src/runtime/shell.rs b/tokio/src/runtime/shell.rs index 992244dea0e..efe77f20d13 100644 --- a/tokio/src/runtime/shell.rs +++ b/tokio/src/runtime/shell.rs @@ -46,7 +46,7 @@ impl Shell { let mut cx = Context::from_waker(&self.waker); loop { - if let Ready(v) = f.as_mut().poll(&mut cx) { + if let Ready(v) = crate::coop::budget(|| f.as_mut().poll(&mut cx)) { return v; } diff --git a/tokio/src/runtime/task/core.rs b/tokio/src/runtime/task/core.rs index 43e6b4716a1..dee55a54f10 100644 --- a/tokio/src/runtime/task/core.rs +++ b/tokio/src/runtime/task/core.rs @@ -161,7 +161,7 @@ impl Core { let waker_ref = waker_ref::(header); let mut cx = Context::from_waker(&*waker_ref); - future.poll(&mut cx) + crate::coop::budget(|| future.poll(&mut cx)) }) }; diff --git a/tokio/src/runtime/task/join.rs b/tokio/src/runtime/task/join.rs index ed893a35c19..fdcc346e5c1 100644 --- a/tokio/src/runtime/task/join.rs +++ b/tokio/src/runtime/task/join.rs @@ -101,6 +101,9 @@ impl Future for JoinHandle { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut ret = Poll::Pending; + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + // Raw should always be set. If it is not, this is due to polling after // completion let raw = self diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index cc79530a96b..53c7e5b616e 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -187,7 +187,12 @@ cfg_blocking! { // Get the worker core. If none is set, then blocking is fine! let core = match cx.core.borrow_mut().take() { - Some(core) => core, + Some(core) => { + // We are effectively leaving the executor, so we need to + // forcibly end budgeting. + crate::coop::stop(); + core + }, None => return, }; diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 2fc915d0e83..dc02dae2e19 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -265,6 +265,9 @@ where pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { use super::block::Read::*; + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; @@ -424,6 +427,9 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { cx: &mut Context<'_>, permit: &mut Permit, ) -> Poll> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + permit .poll_acquire(cx, 1, &self.0) .map_err(|_| ClosedError::new()) diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index 92218cbc445..4aceb000d1f 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -156,13 +156,18 @@ impl Mutex { lock: self, permit: semaphore::Permit::new(), }; - poll_fn(|cx| guard.permit.poll_acquire(cx, 1, &self.s)) - .await - .unwrap_or_else(|_| { - // The semaphore was closed. but, we never explicitly close it, and we have a - // handle to it through the Arc, which means that this can never happen. - unreachable!() - }); + poll_fn(|cx| { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + + guard.permit.poll_acquire(cx, 1, &self.s) + }) + .await + .unwrap_or_else(|_| { + // The semaphore was closed. but, we never explicitly close it, and we have a + // handle to it through the Arc, which means that this can never happen. + unreachable!() + }); guard } diff --git a/tokio/src/sync/oneshot.rs b/tokio/src/sync/oneshot.rs index 6c7b97cfa92..163a708d11a 100644 --- a/tokio/src/sync/oneshot.rs +++ b/tokio/src/sync/oneshot.rs @@ -196,6 +196,9 @@ impl Sender { #[doc(hidden)] // TODO: remove pub fn poll_closed(&mut self, cx: &mut Context<'_>) -> Poll<()> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + let inner = self.inner.as_ref().unwrap(); let mut state = State::load(&inner.state, Acquire); @@ -544,6 +547,9 @@ impl Inner { } fn poll_recv(&self, cx: &mut Context<'_>) -> Poll> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + // Load the state let mut state = State::load(&self.state, Acquire); diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index bd52f6d7f4e..ccd0e884740 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -119,6 +119,9 @@ impl<'a, T> ReleasingPermit<'a, T> { cx: &mut Context<'_>, s: &Semaphore, ) -> Poll> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + self.permit.poll_acquire(cx, self.num_permits, s) } } diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 7721e01f52c..ec43bc522b0 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -60,9 +60,14 @@ impl Semaphore { sem: &self, ll_permit: ll::Permit::new(), }; - poll_fn(|cx| permit.ll_permit.poll_acquire(cx, 1, &self.ll_sem)) - .await - .unwrap(); + poll_fn(|cx| { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + + permit.ll_permit.poll_acquire(cx, 1, &self.ll_sem) + }) + .await + .unwrap(); permit } diff --git a/tokio/src/time/driver/registration.rs b/tokio/src/time/driver/registration.rs index 141ca0154c1..b77357e7353 100644 --- a/tokio/src/time/driver/registration.rs +++ b/tokio/src/time/driver/registration.rs @@ -39,6 +39,9 @@ impl Registration { } pub(crate) fn poll_elapsed(&self, cx: &mut task::Context<'_>) -> Poll> { + // Keep track of task budget + ready!(crate::coop::poll_proceed(cx)); + self.entry.poll_elapsed(cx) } }