Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shared combinator can block the executor #2130

Closed
Darksonn opened this issue Apr 20, 2020 · 3 comments · Fixed by #2136
Closed

Shared combinator can block the executor #2130

Darksonn opened this issue Apr 20, 2020 · 3 comments · Fixed by #2136

Comments

@Darksonn
Copy link
Contributor

In a similar vein to #2047, the FutureExt::shared combinator can block the executor by repeatedly polling the future in an infinite loop. Originally discovered in tokio#2418.

The issue is the REPOLL case in the match below, which unconditionally polls the future again without yielding.

let output = loop {
let future = unsafe {
match &mut *inner.future_or_output.get() {
FutureOrOutput::Future(fut) => Pin::new_unchecked(fut),
_ => unreachable!(),
}
};
let poll = future.poll(&mut cx);
match poll {
Poll::Pending => {
let state = &inner.notifier.state;
match state.compare_and_swap(POLLING, IDLE, SeqCst) {
POLLING => {
// Success
drop(_reset);
this.inner = Some(inner);
return Poll::Pending;
}
REPOLL => {
// Was woken since: Gotta poll again!
let prev = state.swap(POLLING, SeqCst);
assert_eq!(prev, REPOLL);
}
_ => unreachable!(),
}
}
Poll::Ready(output) => break output,
}
};

@Nemo157
Copy link
Member

Nemo157 commented Apr 20, 2020

Interesting, this is sort of the exact opposite of #2047, it's a bug triggered by wrapping a future that is never ready (or cannot become ready while the current thread is blocked), but still immediately requests a wakeup when polled. (EDIT: Ah, reading further in #2047 this same issue is brought up in the context of FuturesUnordered as well).

As a quick testcase (playground)

let mut fut = poll_fn::<(), _>(|ctx| {
    ctx.waker().wake_by_ref();
    Poll::Pending
})
.shared();
let mut ctx = Context::from_waker(noop_waker_ref());
assert_eq!(fut.poll_unpin(&mut ctx), Poll::Pending);

Maybe the REPOLL case should not be handled internally and instead be left up to the executor to repoll this Shared? (though I have only skimmed the highlighted snippet above and I don't really know how shared works internally).

@Darksonn
Copy link
Contributor Author

If I understand the implementation correctly, it would be correct to simply return Poll::Pending in that match case. The waker we gave to the inner future should have woken whoever polled the Shared through the custom waker, and the compare_and_swap left the atomic unchanged as REPOLL, which is what I think it should be.

Would love to hear from someone familiar with the implementation though.

@Marwes
Copy link
Contributor

Marwes commented Apr 24, 2020

Depending on what exact expectations is here between executors and wakers this might be better fixed on tokio's side. What tokio does here is to tell the executor that this task should be woken immediately which works ok as long as it is tokio's executor that gets woken directly. However here Shared intercepts that wake and immediately polls again, since it was told it should wake the future and it ends up in this loop.

So an alternative "fix" could be to alter tokio's poll_proceed so that it only wakes the task the first time that the counter reaches zero. Any later calls just returns Pending until tokio gets control again and can reset the counter.

https://github.com/tokio-rs/tokio/blob/f39c15334e74b07a44efaa0f7201262e17e4f062/tokio/src/coop.rs#L225-L239

pub fn poll_proceed(cx: &mut Context<'_>) -> Poll<()> {
    HITS.with(|hits| {
        let n = hits.get();
        if n == UNCONSTRAINED {
            // opted out of budgeting
            Poll::Ready(())
        } else if n == 1 { // Need to adjust the BUDGET + 1 also
            cx.waker().wake_by_ref();
            Poll::Pending
        } else if n == 0 {
            Poll::Pending
        } else {
            hits.set(n.saturating_sub(1));
            Poll::Ready(())
        }
    })
}

Marwes pushed a commit to Marwes/futures-rs that referenced this issue Apr 24, 2020
If the wrapped future requires that another future runs before it stops
waking itself while returning pending the current Shared will loop
forever. This removes the `REPOLL` case and returns pending immediately,
since the current task is recorded and therefore woken through
`Notifier`'s `wake_by_ref` the `Shared` future will still be polled
again

Fixes rust-lang#2130
cramertj pushed a commit that referenced this issue May 6, 2020
If the wrapped future requires that another future runs before it stops
waking itself while returning pending the current Shared will loop
forever. This removes the `REPOLL` case and returns pending immediately,
since the current task is recorded and therefore woken through
`Notifier`'s `wake_by_ref` the `Shared` future will still be polled
again

Fixes #2130
exrook pushed a commit to exrook/futures-rs that referenced this issue Apr 5, 2021
If the wrapped future requires that another future runs before it stops
waking itself while returning pending the current Shared will loop
forever. This removes the `REPOLL` case and returns pending immediately,
since the current task is recorded and therefore woken through
`Notifier`'s `wake_by_ref` the `Shared` future will still be polled
again

Fixes rust-lang#2130
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants