Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FuturesUnordered can block the executor #2047

Closed
jonhoo opened this issue Jan 23, 2020 · 5 comments · Fixed by #2049
Closed

FuturesUnordered can block the executor #2047

jonhoo opened this issue Jan 23, 2020 · 5 comments · Fixed by #2049

Comments

@jonhoo
Copy link
Contributor

jonhoo commented Jan 23, 2020

The implementation of FuturesUnordered::poll_next never yields as long as at least one future inside of it is ready. This is problematic if it manages a sufficient number of futures that the time it takes to poll all the ready futures is larger than the time it takes between when any given future it manages becomes ready. FuturesUnordered::poll_next will then enter what is effectively an infinite loop, holding up the executor and starving other tasks in the system.

For a simple example of this, consider a FuturesUnordered that manages futures that run on retry timers. If polling all the ready futures takes longer than the retry time of any one future, then by the time it finishes polling all the futures, the first future it polled will be ready again since its retry timer expired.

The easiest way to fix this is to place a limit on how many calls to Future::poll a given call to FuturesUnordered::poll_next can make. If the limit is exceeded, yield by calling cx.waker().wake_by_ref() and return Poll::Pending.

It is not inherently an issue to yield Some many times in a row, so only counting the calls inside of poll_next should be sufficient. It does place the burden on the caller to implement this kind of limiting themselves too though, so there is an argument for doing a "forced yield" every N calls to Future::poll, resetting the counter whenever we return Poll::Pending (but not when we return Poll::Ready(Some(_))). This would help other Future implementors that wrap FuturesUnordered, since they will get the protection from accidentally blocking the executor by no yielding for too long "for free".

@jonhoo
Copy link
Contributor Author

jonhoo commented Jan 23, 2020

One other option, which may not be sufficient I'm not sure, is to do a "forced yield" if we detect that we are about to poll a future that already returned Pending in this call to FuturesUnordered::poll_next (but has become ready again since we first polled it). That would at least fix the retry timer case above, but I'm not sure it solves the more general case of poll_next potentially never yielding in degenerate cases.

@jonhoo
Copy link
Contributor Author

jonhoo commented Jan 23, 2020

This is related to #869 and #1957. I think the problem is particularly bad for FuturesUnordered though, because it manages potentially thousands of child futures, and so it runs into the issue even if the underlying futures return Pending.

@cramertj
Copy link
Member

I think the only real solution to this, as per @carllerche's comment in #2049, is to prevent leaf futures from surfacing an unbounded amount of work before yielding. I don't think it's reasonable or practical to have every future, stream, async fn etc. keep a counter around every loop to trigger a yield, especially when such loop counters stack exponentially and therefore become basically useless in a large chain of nested futures.

I'll leave this issue open for discussion, but I consider this "working as intended" from the futures-rs perspective.

@jonhoo
Copy link
Contributor Author

jonhoo commented Jan 24, 2020

Hmm, I may have phrased the above poorly. Preventing leaf futures from surfacing unbounded work (e.g., through a mechanism as proposed in tokio-rs/tokio#2160), does help immensely, but there is one way in which FuturesUnordered it "special" — it will continue to poll futures even after they return Pending.

Consider a "perfect" implementation of the leaf-tracking scheme that @carllerche proposed (and that is partially implemented in tokio-rs/tokio#2160), where after some number of polls, all leaf futures begin returning Poll::Pending. In order to do so, they must wake the current task since nothing else will wake it. That is why the code for that kind of "volunteer yielding" looks like

if volunteer {
    cx.waker().wake_by_ref();
    return Poll::Pending
}

Now consider what happens if you have a FuturesUnordered in such a system. When FuturesUnordered::poll_next is called, it calls poll on any child futures that have been awoken. Imagine that those child futures all return Pending. Some of them because they genuinely aren't finished yet, and some because they are "volunteering" like above. Since they immediately wake themselves, they will immediately be added back onto the FuturesUnordered's run queue. Since FuturesUnordered::poll_next does not return Poll::Pending until the run queue is empty, it will poll all of those futures that volunteered a second time. They will volunteer again, and wake themselves. Leading to an infinite loop.

One option here is for FuturesUnordered to also integrate with the "volunteering" mechanism offered by the executor (if it has one). If it did, then it would volunteer (and return) the moment any of its child futures volunteered, and all would be well. Since we don't yet have a "common" volunteering mechanism across the ecosystem though, there is no such thing for FuturesUnordered to integrate with. That means we have to have some (hopefully temporary) stand-in.

One option for that is what #2049 implements. Another is to keep a "generation" counter for each future, increment the generation on each poll_next, update the generation for a future whenever we poll it, and then yield if we are ever about to poll a future that we have already polled this generation.

@cramertj
Copy link
Member

Ah, that makes a lot more sense to me, thanks for clarifying! It's still unfortunate in that multiple nested FuturesUnordereds will stack multiplicative-ly. However, that use case is certainly more rare and targeted than the "limit looping in literally every combinator" case, so I'm much more amenable to #2049. I've merged that PR, and I've opened #2053 to discuss more.

jonhoo added a commit to tokio-rs/tokio that referenced this issue Mar 16, 2020
A single call to `poll` on a top-level task may potentially do a lot of
work before it returns `Poll::Pending`. If a task runs for a long period
of time without yielding back to the executor, it can starve other tasks
waiting on that executor to execute them, or drive underlying resources.
See for example rust-lang/futures-rs#2047, rust-lang/futures-rs#1957,
and rust-lang/futures-rs#869. Since Rust does not have a runtime, it is
difficult to forcibly preempt a long-running task.

Consider a future like this one:

```rust
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {}
}
```

It may look harmless, but consider what happens under heavy load if the
input stream is _always_ ready. If we spawn `drop_all`, the task will
never yield, and will starve other tasks and resources on the same
executor.

This patch adds a `coop` module that provides an opt-in mechanism for
futures to cooperate with the executor to avoid starvation. This
alleviates the problem above:

```
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {
        tokio::coop::proceed().await;
    }
}
```

The call to [`proceed`] will coordinate with the executor to make sure
that every so often control is yielded back to the executor so it can
run other tasks.

The implementation uses a thread-local counter that simply counts how
many "cooperation points" we have passed since the task was first
polled. Once the "budget" has been spent, any subsequent points will
return `Poll::Pending`, eventually making the top-level task yield. When
it finally does yield, the executor resets the budget before
running the next task.

The budget per task poll is currently hard-coded to 128. Eventually, we
may want to make it dynamic as more cooperation points are added. The
number 128 was chosen more or less arbitrarily to balance the cost of
yielding unnecessarily against the time an executor may be "held up".

At the moment, all the tokio leaf futures ("resources") call into coop,
but external futures have no way of doing so. We probably want to
continue limiting coop points to leaf futures in the future, but may
want to also enable third-party leaf futures to cooperate to benefit the
ecosystem as a whole. This is reflected in the methods marked as `pub`
in `mod coop` (even though the module is only `pub(crate)`). We will
likely also eventually want to expose `coop::limit`, which enables
sub-executors and manual `impl Future` blocks to avoid one sub-task
spending all of their poll budget.

Benchmarks (see #2160) suggest that the overhead of `coop`
is marginal.
jonhoo added a commit to tokio-rs/tokio that referenced this issue Mar 16, 2020
A single call to `poll` on a top-level task may potentially do a lot of
work before it returns `Poll::Pending`. If a task runs for a long period
of time without yielding back to the executor, it can starve other tasks
waiting on that executor to execute them, or drive underlying resources.
See for example rust-lang/futures-rs#2047, rust-lang/futures-rs#1957,
and rust-lang/futures-rs#869. Since Rust does not have a runtime, it is
difficult to forcibly preempt a long-running task.

Consider a future like this one:

```rust
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {}
}
```

It may look harmless, but consider what happens under heavy load if the
input stream is _always_ ready. If we spawn `drop_all`, the task will
never yield, and will starve other tasks and resources on the same
executor.

This patch adds a `coop` module that provides an opt-in mechanism for
futures to cooperate with the executor to avoid starvation. This
alleviates the problem above:

```
use tokio::stream::StreamExt;
async fn drop_all<I: Stream>(input: I) {
    while let Some(_) = input.next().await {
        tokio::coop::proceed().await;
    }
}
```

The call to [`proceed`] will coordinate with the executor to make sure
that every so often control is yielded back to the executor so it can
run other tasks.

The implementation uses a thread-local counter that simply counts how
many "cooperation points" we have passed since the task was first
polled. Once the "budget" has been spent, any subsequent points will
return `Poll::Pending`, eventually making the top-level task yield. When
it finally does yield, the executor resets the budget before
running the next task.

The budget per task poll is currently hard-coded to 128. Eventually, we
may want to make it dynamic as more cooperation points are added. The
number 128 was chosen more or less arbitrarily to balance the cost of
yielding unnecessarily against the time an executor may be "held up".

At the moment, all the tokio leaf futures ("resources") call into coop,
but external futures have no way of doing so. We probably want to
continue limiting coop points to leaf futures in the future, but may
want to also enable third-party leaf futures to cooperate to benefit the
ecosystem as a whole. This is reflected in the methods marked as `pub`
in `mod coop` (even though the module is only `pub(crate)`). We will
likely also eventually want to expose `coop::limit`, which enables
sub-executors and manual `impl Future` blocks to avoid one sub-task
spending all of their poll budget.

Benchmarks (see #2160) suggest that the overhead of `coop`
is marginal.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants