Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ForEach, Fold, and similar stream combinators can run saturated without returning from poll #1957

Open
mzabaluev opened this issue Nov 6, 2019 · 6 comments

Comments

@mzabaluev
Copy link
Contributor

The combinators that poll a stream for exhaustion in a loop have a problem that's already been raised in #869: if the upstream consecutively returns Ready for a long time, the loop never breaks and the combinator's poll never returns for that long, starving other pending operations in the task from being polled.

To illustrate how this can be a problem for other code, consider this simple adapter for making futures cancellable:

use futures::channel::oneshot::{self, Canceled};
use futures::prelude::*;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::ready;
use pin_utils::unsafe_pinned;

struct CancelHandle(oneshot::Sender<()>);

#[derive(Debug)]
struct AlreadyDropped;

impl CancelHandle {
    fn cancel(self) -> Result<(), AlreadyDropped> {
        self.0.send(()).map_err(|()| AlreadyDropped)
    }
}

struct Cancelable<F> {
    op: F,
    stop_rx: oneshot::Receiver<()>,
}

impl<F> Cancelable<F> {
    unsafe_pinned!(op: F);
    unsafe_pinned!(stop_rx: oneshot::Receiver<()>);
}

impl<F: Unpin> Unpin for Cancelable<F> {}

impl<F> Future for Cancelable<F>
where
    F: Future,
{
    type Output = Result<F::Output, Canceled>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.as_mut().stop_rx().poll(cx) {
            Poll::Pending => {
                let output = ready!(self.as_mut().op().poll(cx));
                Ok(output).into()
            }
            Poll::Ready(_res) => Err(Canceled).into(),
        }
    }
}

fn make_cancelable<F>(op: F) -> (Cancelable<F>, CancelHandle) {
    let (stop_tx, stop_rx) = oneshot::channel();
    let fut = Cancelable { op, stop_rx };
    let handle = CancelHandle(stop_tx);
    (fut, handle)
}

It looks rather useful and intuitive, but this contrived example hangs with a busy-looping thread rather than canceling the task:

fn main() {
    let mut a = 0;
    let (fut, stop_handle) = make_cancelable(
        stream::repeat(1).for_each(move |n| {
            a += n;
            future::ready(())
        })
    );
    let mut executor = ThreadPool::new().unwrap();
    let res_handle = executor.spawn_with_handle(fut).unwrap();
    thread::sleep(Duration::from_millis(1));
    stop_handle.cancel().unwrap();
    let res = executor.run(res_handle);
    assert!(res.is_err());
}

In non-contrived usage with real streams, too, a ForEach with an always-ready processing closure will delay cancellation for as long as the stream yields items.

@mzabaluev
Copy link
Contributor Author

mzabaluev commented Nov 6, 2019

An easy fix is to add a counter into such polling loops and, upon reaching a certain number of iterations, call an immediate wakeup for the task and return Pending. To make it tunable, the combinators equipped with loop guards could offer a method:

impl<St, Fut, F> ForEach<St, Fut, F> {
    pub fn yield_after_every(mut self, iterations: u32) -> Self {
        self.yield_after_every = iterations;
        self
    }
}

@mzabaluev
Copy link
Contributor Author

The resolution of #869 (comment) offers making a dedicated stream combinator to force yield after N iterations, but I think it's a poor solution due to human aspect: it's very easy to write complex async code disregarding this, and that will work most of the time until a stream happens to be saturated somewhere that is critical.

@mzabaluev
Copy link
Contributor Author

mzabaluev commented Nov 6, 2019

For programmers who are sure they don't need any silly busy-looping guards, but rather need that extra performance in polling something that, in principle, is prone to be pending on I/O, some caution-worded alternative combinators can be added: .for_each_uninterrupted(...) or something like that.

jonhoo added a commit to tokio-rs/tokio that referenced this issue Mar 16, 2020
A single call to `poll` on a top-level task may potentially do a lot of
work before it returns `Poll::Pending`. If a task runs for a long period
of time without yielding back to the executor, it can starve other tasks
waiting on that executor to execute them, or drive underlying resources.
See for example rust-lang/futures-rs#2047, rust-lang/futures-rs#1957,
and rust-lang/futures-rs#869. Since Rust does not have a runtime, it is
difficult to forcibly preempt a long-running task.

Consider a future like this one:

```rust
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {}
}
```

It may look harmless, but consider what happens under heavy load if the
input stream is _always_ ready. If we spawn `drop_all`, the task will
never yield, and will starve other tasks and resources on the same
executor.

This patch adds a `coop` module that provides an opt-in mechanism for
futures to cooperate with the executor to avoid starvation. This
alleviates the problem above:

```
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {
        tokio::coop::proceed().await;
    }
}
```

The call to [`proceed`] will coordinate with the executor to make sure
that every so often control is yielded back to the executor so it can
run other tasks.

The implementation uses a thread-local counter that simply counts how
many "cooperation points" we have passed since the task was first
polled. Once the "budget" has been spent, any subsequent points will
return `Poll::Pending`, eventually making the top-level task yield. When
it finally does yield, the executor resets the budget before
running the next task.

The budget per task poll is currently hard-coded to 128. Eventually, we
may want to make it dynamic as more cooperation points are added. The
number 128 was chosen more or less arbitrarily to balance the cost of
yielding unnecessarily against the time an executor may be "held up".

At the moment, all the tokio leaf futures ("resources") call into coop,
but external futures have no way of doing so. We probably want to
continue limiting coop points to leaf futures in the future, but may
want to also enable third-party leaf futures to cooperate to benefit the
ecosystem as a whole. This is reflected in the methods marked as `pub`
in `mod coop` (even though the module is only `pub(crate)`). We will
likely also eventually want to expose `coop::limit`, which enables
sub-executors and manual `impl Future` blocks to avoid one sub-task
spending all of their poll budget.

Benchmarks (see #2160) suggest that the overhead of `coop`
is marginal.
jonhoo added a commit to tokio-rs/tokio that referenced this issue Mar 16, 2020
A single call to `poll` on a top-level task may potentially do a lot of
work before it returns `Poll::Pending`. If a task runs for a long period
of time without yielding back to the executor, it can starve other tasks
waiting on that executor to execute them, or drive underlying resources.
See for example rust-lang/futures-rs#2047, rust-lang/futures-rs#1957,
and rust-lang/futures-rs#869. Since Rust does not have a runtime, it is
difficult to forcibly preempt a long-running task.

Consider a future like this one:

```rust
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {}
}
```

It may look harmless, but consider what happens under heavy load if the
input stream is _always_ ready. If we spawn `drop_all`, the task will
never yield, and will starve other tasks and resources on the same
executor.

This patch adds a `coop` module that provides an opt-in mechanism for
futures to cooperate with the executor to avoid starvation. This
alleviates the problem above:

```
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {
        tokio::coop::proceed().await;
    }
}
```

The call to [`proceed`] will coordinate with the executor to make sure
that every so often control is yielded back to the executor so it can
run other tasks.

The implementation uses a thread-local counter that simply counts how
many "cooperation points" we have passed since the task was first
polled. Once the "budget" has been spent, any subsequent points will
return `Poll::Pending`, eventually making the top-level task yield. When
it finally does yield, the executor resets the budget before
running the next task.

The budget per task poll is currently hard-coded to 128. Eventually, we
may want to make it dynamic as more cooperation points are added. The
number 128 was chosen more or less arbitrarily to balance the cost of
yielding unnecessarily against the time an executor may be "held up".

At the moment, all the tokio leaf futures ("resources") call into coop,
but external futures have no way of doing so. We probably want to
continue limiting coop points to leaf futures in the future, but may
want to also enable third-party leaf futures to cooperate to benefit the
ecosystem as a whole. This is reflected in the methods marked as `pub`
in `mod coop` (even though the module is only `pub(crate)`). We will
likely also eventually want to expose `coop::limit`, which enables
sub-executors and manual `impl Future` blocks to avoid one sub-task
spending all of their poll budget.

Benchmarks (see #2160) suggest that the overhead of `coop`
is marginal.
@mzabaluev
Copy link
Contributor Author

mzabaluev commented Apr 2, 2020

Has this been solved by tokio-rs/tokio#2160?

@jonhoo
Copy link
Contributor

jonhoo commented Apr 2, 2020

It's solved only in the context of tokio I'm afraid, but yes, in that context it should be solved. If we wanted a solution for all executors, we'd need to either wait for the coop stuff to evolve beyond tokio, or implement something like #2049 (which solved #2047).

@mzabaluev
Copy link
Contributor Author

I like the cooperative budget approach outlined in rust-lang/rust#74335 (comment)
Is there an RFC for that?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants