From 4748b2571fc02d5ebbfe59e457f0e8d8ef0eb5f3 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 6 May 2020 19:02:07 -0700 Subject: [PATCH] rt: simplify coop implementation (#2498) Simplifies coop implementation. Prunes unused code, create a `Budget` type to track the current budget. --- tokio/src/coop.rs | 467 +++++++++--------------------- tokio/src/macros/cfg.rs | 20 ++ tokio/src/sync/batch_semaphore.rs | 4 + tokio/src/sync/mutex.rs | 3 +- tokio/src/sync/rwlock.rs | 3 +- tokio/src/sync/semaphore.rs | 5 +- tokio/src/task/local.rs | 62 ++-- 7 files changed, 196 insertions(+), 368 deletions(-) diff --git a/tokio/src/coop.rs b/tokio/src/coop.rs index 606ba3a7395..4d57161fa22 100644 --- a/tokio/src/coop.rs +++ b/tokio/src/coop.rs @@ -1,11 +1,12 @@ //! Opt-in yield points for improved cooperative scheduling. //! -//! A single call to [`poll`] on a top-level task may potentially do a lot of work before it -//! returns `Poll::Pending`. If a task runs for a long period of time without yielding back to the -//! executor, it can starve other tasks waiting on that executor to execute them, or drive -//! underlying resources. Since Rust does not have a runtime, it is difficult to forcibly preempt a -//! long-running task. Instead, this module provides an opt-in mechanism for futures to collaborate -//! with the executor to avoid starvation. +//! A single call to [`poll`] on a top-level task may potentially do a lot of +//! work before it returns `Poll::Pending`. If a task runs for a long period of +//! time without yielding back to the executor, it can starve other tasks +//! waiting on that executor to execute them, or drive underlying resources. +//! Since Rust does not have a runtime, it is difficult to forcibly preempt a +//! long-running task. Instead, this module provides an opt-in mechanism for +//! futures to collaborate with the executor to avoid starvation. //! //! Consider a future like this one: //! @@ -16,9 +17,10 @@ //! } //! ``` //! -//! It may look harmless, but consider what happens under heavy load if the input stream is -//! _always_ ready. If we spawn `drop_all`, the task will never yield, and will starve other tasks -//! and resources on the same executor. With opt-in yield points, this problem is alleviated: +//! It may look harmless, but consider what happens under heavy load if the +//! input stream is _always_ ready. If we spawn `drop_all`, the task will never +//! yield, and will starve other tasks and resources on the same executor. With +//! opt-in yield points, this problem is alleviated: //! //! ```ignore //! # use tokio::stream::{Stream, StreamExt}; @@ -29,67 +31,89 @@ //! } //! ``` //! -//! The `proceed` future will coordinate with the executor to make sure that every so often control -//! is yielded back to the executor so it can run other tasks. +//! The `proceed` future will coordinate with the executor to make sure that +//! every so often control is yielded back to the executor so it can run other +//! tasks. //! //! # Placing yield points //! -//! Voluntary yield points should be placed _after_ at least some work has been done. If they are -//! not, a future sufficiently deep in the task hierarchy may end up _never_ getting to run because -//! of the number of yield points that inevitably appear before it is reached. In general, you will -//! want yield points to only appear in "leaf" futures -- those that do not themselves poll other -//! futures. By doing this, you avoid double-counting each iteration of the outer future against -//! the cooperating budget. +//! Voluntary yield points should be placed _after_ at least some work has been +//! done. If they are not, a future sufficiently deep in the task hierarchy may +//! end up _never_ getting to run because of the number of yield points that +//! inevitably appear before it is reached. In general, you will want yield +//! points to only appear in "leaf" futures -- those that do not themselves poll +//! other futures. By doing this, you avoid double-counting each iteration of +//! the outer future against the cooperating budget. //! -//! [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll +//! [`poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll // NOTE: The doctests in this module are ignored since the whole module is (currently) private. use std::cell::Cell; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Constant used to determine how much "work" a task is allowed to do without yielding. -/// -/// The value itself is chosen somewhat arbitrarily. It needs to be high enough to amortize wakeup -/// and scheduling costs, but low enough that we do not starve other tasks for too long. The value -/// also needs to be high enough that particularly deep tasks are able to do at least some useful -/// work at all. -/// -/// Note that as more yield points are added in the ecosystem, this value will probably also have -/// to be raised. -const BUDGET: usize = 128; - -/// Constant used to determine if budgeting has been disabled. -const UNCONSTRAINED: usize = usize::max_value(); thread_local! { - static HITS: Cell = Cell::new(UNCONSTRAINED); + static CURRENT: Cell = Cell::new(Budget::unconstrained()); } -/// Run the given closure with a cooperative task budget. -/// -/// Enabling budgeting when it is already enabled is a no-op. +/// Opaque type tracking the amount of "work" a task may still do before +/// yielding back to the scheduler. +#[derive(Debug, Copy, Clone)] +pub(crate) struct Budget(Option); + +impl Budget { + /// Budget assigned to a task on each poll. + /// + /// The value itself is chosen somewhat arbitrarily. It needs to be high + /// enough to amortize wakeup and scheduling costs, but low enough that we + /// do not starve other tasks for too long. The value also needs to be high + /// enough that particularly deep tasks are able to do at least some useful + /// work at all. + /// + /// Note that as more yield points are added in the ecosystem, this value + /// will probably also have to be raised. + const fn initial() -> Budget { + Budget(Some(128)) + } + + /// Returns an unconstrained budget. Operations will not be limited. + const fn unconstrained() -> Budget { + Budget(None) + } +} + +cfg_rt_threaded! { + impl Budget { + fn has_remaining(self) -> bool { + self.0.map(|budget| budget > 0).unwrap_or(true) + } + } +} + +/// Run the given closure with a cooperative task budget. When the function +/// returns, the budget is reset to the value prior to calling the function. #[inline(always)] pub(crate) fn budget(f: F) -> R where F: FnOnce() -> R, { - HITS.with(move |hits| { - if hits.get() != UNCONSTRAINED { - // We are already being budgeted. - // - // Arguably this should be an error, but it can happen "correctly" - // such as with block_on + LocalSet, so we make it a no-op. - return f(); + struct ResetGuard<'a> { + cell: &'a Cell, + prev: Budget, + } + + impl<'a> Drop for ResetGuard<'a> { + fn drop(&mut self) { + self.cell.set(self.prev); } + } + + CURRENT.with(move |cell| { + let prev = cell.get(); + + cell.set(Budget::initial()); + + let _guard = ResetGuard { cell, prev }; - hits.set(BUDGET); - let _guard = ResetGuard { - hits, - prev: UNCONSTRAINED, - }; f() }) } @@ -97,266 +121,53 @@ where cfg_rt_threaded! { #[inline(always)] pub(crate) fn has_budget_remaining() -> bool { - HITS.with(|hits| hits.get() > 0) + CURRENT.with(|cell| cell.get().has_remaining()) } } cfg_blocking_impl! { /// Forcibly remove the budgeting constraints early. pub(crate) fn stop() { - HITS.with(|hits| { - hits.set(UNCONSTRAINED); + CURRENT.with(|cell| { + cell.set(Budget::unconstrained()); }); } } -cfg_rt_core! { - cfg_rt_util! { - /// Run the given closure with a new task budget, resetting the previous - /// budget when the closure finishes. - /// - /// This is intended for internal use by `LocalSet` and (potentially) other - /// similar schedulers which are themselves futures, and need a fresh budget - /// for each of their children. - #[inline(always)] - pub(crate) fn reset(f: F) -> R - where - F: FnOnce() -> R, - { - HITS.with(move |hits| { - let prev = hits.get(); - hits.set(UNCONSTRAINED); - let _guard = ResetGuard { - hits, - prev, - }; - f() - }) - } - } -} - -/// Invoke `f` with a subset of the remaining budget. -/// -/// This is useful if you have sub-futures that you need to poll, but that you want to restrict -/// from using up your entire budget. For example, imagine the following future: -/// -/// ```rust -/// # use std::{future::Future, pin::Pin, task::{Context, Poll}}; -/// use futures::stream::FuturesUnordered; -/// struct MyFuture { -/// big: FuturesUnordered, -/// small: F2, -/// } -/// -/// use tokio::stream::Stream; -/// impl Future for MyFuture -/// where F1: Future, F2: Future -/// # , F1: Unpin, F2: Unpin -/// { -/// type Output = F2::Output; -/// -/// // fn poll(...) -/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { -/// # let this = &mut *self; -/// let mut big = // something to pin self.big -/// # Pin::new(&mut this.big); -/// let small = // something to pin self.small -/// # Pin::new(&mut this.small); -/// -/// // see if any of the big futures have finished -/// while let Some(e) = futures::ready!(big.as_mut().poll_next(cx)) { -/// // do something with e -/// # let _ = e; -/// } -/// -/// // see if the small future has finished -/// small.poll(cx) -/// } -/// # } -/// ``` -/// -/// It could be that every time `poll` gets called, `big` ends up spending the entire budget, and -/// `small` never gets polled. That would be sad. If you want to stick up for the little future, -/// that's what `limit` is for. It lets you portion out a smaller part of the yield budget to a -/// particular segment of your code. In the code above, you would write -/// -/// ```rust,ignore -/// # use std::{future::Future, pin::Pin, task::{Context, Poll}}; -/// # use futures::stream::FuturesUnordered; -/// # struct MyFuture { -/// # big: FuturesUnordered, -/// # small: F2, -/// # } -/// # -/// # use tokio::stream::Stream; -/// # impl Future for MyFuture -/// # where F1: Future, F2: Future -/// # , F1: Unpin, F2: Unpin -/// # { -/// # type Output = F2::Output; -/// # fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { -/// # let this = &mut *self; -/// # let mut big = Pin::new(&mut this.big); -/// # let small = Pin::new(&mut this.small); -/// # -/// // see if any of the big futures have finished -/// while let Some(e) = futures::ready!(tokio::coop::limit(64, || big.as_mut().poll_next(cx))) { -/// # // do something with e -/// # let _ = e; -/// # } -/// # small.poll(cx) -/// # } -/// # } -/// ``` -/// -/// Now, even if `big` spends its entire budget, `small` will likely be left with some budget left -/// to also do useful work. In particular, if the remaining budget was `N` at the start of `poll`, -/// `small` will have at least a budget of `N - 64`. It may be more if `big` did not spend its -/// entire budget. -/// -/// Note that you cannot _increase_ your budget by calling `limit`. The budget provided to the code -/// inside the buget is the _minimum_ of the _current_ budget and the bound. -/// -#[allow(unreachable_pub, dead_code)] -pub fn limit(bound: usize, f: impl FnOnce() -> R) -> R { - HITS.with(|hits| { - let budget = hits.get(); - // with_bound cannot _increase_ the remaining budget - let bound = std::cmp::min(budget, bound); - // When f() exits, how much should we add to what is left? - let floor = budget.saturating_sub(bound); - // Make sure we restore the remaining budget even on panic - struct RestoreBudget<'a>(&'a Cell, usize); - impl<'a> Drop for RestoreBudget<'a> { - fn drop(&mut self) { - let left = self.0.get(); - self.0.set(self.1 + left); +cfg_coop! { + use std::task::{Context, Poll}; + + /// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. + #[inline] + pub(crate) fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> { + CURRENT.with(|cell| { + let mut budget = cell.get(); + + if budget.decrement() { + cell.set(budget); + Poll::Ready(()) + } else { + cx.waker().wake_by_ref(); + Poll::Pending } - } - // Time to restrict! - hits.set(bound); - let _restore = RestoreBudget(&hits, floor); - f() - }) -} - -/// Returns `Poll::Pending` if the current task has exceeded its budget and should yield. -#[allow(unreachable_pub, dead_code)] -#[inline] -pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> { - HITS.with(|hits| { - let n = hits.get(); - if n == UNCONSTRAINED { - // opted out of budgeting - Poll::Ready(()) - } else if n == 0 { - cx.waker().wake_by_ref(); - Poll::Pending - } else { - hits.set(n.saturating_sub(1)); - Poll::Ready(()) - } - }) -} - -/// Resolves immediately unless the current task has already exceeded its budget. -/// -/// This should be placed after at least some work has been done. Otherwise a future sufficiently -/// deep in the task hierarchy may end up never getting to run because of the number of yield -/// points that inevitably appear before it is even reached. For example: -/// -/// ```ignore -/// # use tokio::stream::{Stream, StreamExt}; -/// async fn drop_all(mut input: I) { -/// while let Some(_) = input.next().await { -/// tokio::coop::proceed().await; -/// } -/// } -/// ``` -#[allow(unreachable_pub, dead_code)] -#[inline] -pub async fn proceed() { - use crate::future::poll_fn; - poll_fn(|cx| poll_proceed(cx)).await; -} - -pin_project_lite::pin_project! { - /// A future that cooperatively yields to the task scheduler when polling, - /// if the task's budget is exhausted. - /// - /// Internally, this is simply a future combinator which calls - /// [`poll_proceed`] in its `poll` implementation before polling the wrapped - /// future. - /// - /// # Examples - /// - /// ```rust,ignore - /// # #[tokio::main] - /// # async fn main() { - /// use tokio::coop::CoopFutureExt; - /// - /// async { /* ... */ } - /// .cooperate() - /// .await; - /// # } - /// ``` - /// - /// [`poll_proceed`]: fn@poll_proceed - #[derive(Debug)] - #[allow(unreachable_pub, dead_code)] - pub struct CoopFuture { - #[pin] - future: F, + }) } -} - -struct ResetGuard<'a> { - hits: &'a Cell, - prev: usize, -} - -impl Future for CoopFuture { - type Output = F::Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - ready!(poll_proceed(cx)); - self.project().future.poll(cx) - } -} -impl CoopFuture { - /// Returns a new `CoopFuture` wrapping the given future. - /// - #[allow(unreachable_pub, dead_code)] - pub fn new(future: F) -> Self { - Self { future } + impl Budget { + /// Decrement the budget. Returns `true` if successful. Decrementing fails + /// when there is not enough remaining budget. + fn decrement(&mut self) -> bool { + if let Some(num) = &mut self.0 { + if *num > 0 { + *num -= 1; + true + } else { + false + } + } else { + true + } } -} - -// Currently only used by `tokio::sync`; and if we make this combinator public, -// it should probably be on the `FutureExt` trait instead. -cfg_sync! { - /// Extension trait providing `Future::cooperate` extension method. - /// - /// Note: if/when the co-op API becomes public, this method should probably be - /// provided by `FutureExt`, instead. - pub(crate) trait CoopFutureExt: Future { - /// Wrap `self` to cooperatively yield to the scheduler when polling, if the - /// task's budget is exhausted. - fn cooperate(self) -> CoopFuture - where - Self: Sized, - { - CoopFuture::new(self) - } - } - - impl CoopFutureExt for F where F: Future {} -} - -impl<'a> Drop for ResetGuard<'a> { - fn drop(&mut self) { - self.hits.set(self.prev); } } @@ -364,49 +175,53 @@ impl<'a> Drop for ResetGuard<'a> { mod test { use super::*; - fn get() -> usize { - HITS.with(|hits| hits.get()) + fn get() -> Budget { + CURRENT.with(|cell| cell.get()) } #[test] fn bugeting() { + use futures::future::poll_fn; use tokio_test::*; - assert_eq!(get(), UNCONSTRAINED); + assert!(get().0.is_none()); + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); - assert_eq!(get(), UNCONSTRAINED); + + assert!(get().0.is_none()); + budget(|| { - assert_eq!(get(), BUDGET); + assert_eq!(get().0, Budget::initial().0); assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); - assert_eq!(get(), BUDGET - 1); + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); - assert_eq!(get(), BUDGET - 2); + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); + + budget(|| { + assert_eq!(get().0, Budget::initial().0); + + assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 1); + }); + + assert_eq!(get().0.unwrap(), Budget::initial().0.unwrap() - 2); }); - assert_eq!(get(), UNCONSTRAINED); + + assert!(get().0.is_none()); budget(|| { - limit(3, || { - assert_eq!(get(), 3); - assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); - assert_eq!(get(), 2); - limit(4, || { - assert_eq!(get(), 2); - assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); - assert_eq!(get(), 1); - }); - assert_eq!(get(), 1); + let n = get().0.unwrap(); + + for _ in 0..n { assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); - assert_eq!(get(), 0); - assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); - assert_eq!(get(), 0); - assert_pending!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); - assert_eq!(get(), 0); - }); - assert_eq!(get(), BUDGET - 3); - assert_ready!(task::spawn(()).enter(|cx, _| poll_proceed(cx))); - assert_eq!(get(), BUDGET - 4); - assert_ready!(task::spawn(proceed()).poll()); - assert_eq!(get(), BUDGET - 5); + } + + let mut task = task::spawn(poll_fn(|cx| { + ready!(poll_proceed(cx)); + Poll::Ready(()) + })); + + assert_pending!(task.poll()); }); } } diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 618b8f6667f..288f58d2f40 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -363,3 +363,23 @@ macro_rules! cfg_unstable { )* } } + +macro_rules! cfg_coop { + ($($item:item)*) => { + $( + #[cfg(any( + feature = "blocking", + feature = "dns", + feature = "fs", + feature = "io-driver", + feature = "io-std", + feature = "process", + feature = "rt-core", + feature = "sync", + feature = "stream", + feature = "time" + ))] + $item + )* + } +} diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 436737a6709..698e908ec59 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -386,7 +386,11 @@ impl Future for Acquire<'_> { type Output = Result<(), AcquireError>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // First, ensure the current task has enough budget to proceed. + ready!(crate::coop::poll_proceed(cx)); + let (node, semaphore, needed, queued) = self.project(); + match semaphore.poll_acquire(cx, needed, node, *queued) { Pending => { *queued = true; diff --git a/tokio/src/sync/mutex.rs b/tokio/src/sync/mutex.rs index e0618a5d6e4..539488a3a49 100644 --- a/tokio/src/sync/mutex.rs +++ b/tokio/src/sync/mutex.rs @@ -1,4 +1,3 @@ -use crate::coop::CoopFutureExt; use crate::sync::batch_semaphore as semaphore; use std::cell::UnsafeCell; @@ -255,7 +254,7 @@ impl Mutex { } async fn acquire(&self) { - self.s.acquire(1).cooperate().await.unwrap_or_else(|_| { + self.s.acquire(1).await.unwrap_or_else(|_| { // The semaphore was closed. but, we never explicitly close it, and // we own it exclusively, which means that this can never happen. unreachable!() diff --git a/tokio/src/sync/rwlock.rs b/tokio/src/sync/rwlock.rs index 68cf710e84b..4e2fb74d19e 100644 --- a/tokio/src/sync/rwlock.rs +++ b/tokio/src/sync/rwlock.rs @@ -1,4 +1,3 @@ -use crate::coop::CoopFutureExt; use crate::sync::batch_semaphore::{AcquireError, Semaphore}; use std::cell::UnsafeCell; use std::ops; @@ -116,7 +115,7 @@ impl<'a, T> ReleasingPermit<'a, T> { lock: &'a RwLock, num_permits: u16, ) -> Result, AcquireError> { - lock.s.acquire(num_permits).cooperate().await?; + lock.s.acquire(num_permits).await?; Ok(Self { num_permits, lock }) } } diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index c1dd975f282..1bfeaebc95e 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -1,5 +1,4 @@ use super::batch_semaphore as ll; // low level implementation -use crate::coop::CoopFutureExt; use std::sync::Arc; /// Counting semaphore performing asynchronous permit aquisition. @@ -87,7 +86,7 @@ impl Semaphore { /// Acquires permit from the semaphore. pub async fn acquire(&self) -> SemaphorePermit<'_> { - self.ll_sem.acquire(1).cooperate().await.unwrap(); + self.ll_sem.acquire(1).await.unwrap(); SemaphorePermit { sem: &self, permits: 1, @@ -111,7 +110,7 @@ impl Semaphore { /// /// [`Arc`]: std::sync::Arc pub async fn acquire_owned(self: Arc) -> OwnedSemaphorePermit { - self.ll_sem.acquire(1).cooperate().await.unwrap(); + self.ll_sem.acquire(1).await.unwrap(); OwnedSemaphorePermit { sem: self.clone(), permits: 1, diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 346fe437f45..374671fb230 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -454,24 +454,20 @@ impl Future for LocalSet { // Register the waker before starting to work self.context.shared.waker.register_by_ref(cx.waker()); - // Reset any previous task budget while polling tasks spawned on the - // `LocalSet`, ensuring that each has its own separate budget. - crate::coop::reset(|| { - if self.with(|| self.tick()) { - // If `tick` returns true, we need to notify the local future again: - // there are still tasks remaining in the run queue. - cx.waker().wake_by_ref(); - Poll::Pending - } else if self.context.tasks.borrow().owned.is_empty() { - // If the scheduler has no remaining futures, we're done! - Poll::Ready(()) - } else { - // There are still futures in the local set, but we've polled all the - // futures in the run queue. Therefore, we can just return Pending - // since the remaining futures will be woken from somewhere else. - Poll::Pending - } - }) + if self.with(|| self.tick()) { + // If `tick` returns true, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + Poll::Pending + } else if self.context.tasks.borrow().owned.is_empty() { + // If the scheduler has no remaining futures, we're done! + Poll::Ready(()) + } else { + // There are still futures in the local set, but we've polled all the + // futures in the run queue. Therefore, we can just return Pending + // since the remaining futures will be woken from somewhere else. + Poll::Pending + } } } @@ -525,23 +521,19 @@ impl Future for RunUntil<'_, T> { .register_by_ref(cx.waker()); let _no_blocking = crate::runtime::enter::disallow_blocking(); - // Reset any previous task budget so that the future passed to - // `run_until` and any tasks spawned on the `LocalSet` have their - // own budgets. - crate::coop::reset(|| { - let f = me.future; - if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) { - return Poll::Ready(output); - } - - if me.local_set.tick() { - // If `tick` returns `true`, we need to notify the local future again: - // there are still tasks remaining in the run queue. - cx.waker().wake_by_ref(); - } - - Poll::Pending - }) + let f = me.future; + + if let Poll::Ready(output) = crate::coop::budget(|| f.poll(cx)) { + return Poll::Ready(output); + } + + if me.local_set.tick() { + // If `tick` returns `true`, we need to notify the local future again: + // there are still tasks remaining in the run queue. + cx.waker().wake_by_ref(); + } + + Poll::Pending }) } }