Skip to content

Commit

Permalink
fix: Shared must relinquish control to the executor if repolled
Browse files Browse the repository at this point in the history
If the wrapped future requires that another future runs before it stops
waking itself while returning pending the current Shared will loop
forever. This removes the `REPOLL` case and returns pending immediately,
since the current task is recorded and therefore woken through
`Notifier`'s `wake_by_ref` the `Shared` future will still be polled
again

Fixes rust-lang#2130
  • Loading branch information
Markus Westerlind authored and cramertj committed May 6, 2020
1 parent 5c83e08 commit f698fdf
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
25 changes: 7 additions & 18 deletions futures-util/src/future/future/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ where

const IDLE: usize = 0;
const POLLING: usize = 1;
const REPOLL: usize = 2;
const COMPLETE: usize = 3;
const POISONED: usize = 4;
const COMPLETE: usize = 2;
const POISONED: usize = 3;

const NULL_WAKER_KEY: usize = usize::max_value();

Expand Down Expand Up @@ -190,7 +189,7 @@ where
IDLE => {
// Lock acquired, fall through
}
POLLING | REPOLL => {
POLLING => {
// Another task is currently polling, at this point we just want
// to ensure that the waker for this task is registered
this.inner = Some(inner);
Expand Down Expand Up @@ -221,35 +220,27 @@ where

let _reset = Reset(&inner.notifier.state);

let output = loop {
let output = {
let future = unsafe {
match &mut *inner.future_or_output.get() {
FutureOrOutput::Future(fut) => Pin::new_unchecked(fut),
_ => unreachable!(),
}
};

let poll = future.poll(&mut cx);

match poll {
match future.poll(&mut cx) {
Poll::Pending => {
let state = &inner.notifier.state;
match state.compare_and_swap(POLLING, IDLE, SeqCst) {
match inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) {
POLLING => {
// Success
drop(_reset);
this.inner = Some(inner);
return Poll::Pending;
}
REPOLL => {
// Was woken since: Gotta poll again!
let prev = state.swap(POLLING, SeqCst);
assert_eq!(prev, REPOLL);
}
_ => unreachable!(),
}
}
Poll::Ready(output) => break output,
Poll::Ready(output) => output,
}
};

Expand Down Expand Up @@ -304,8 +295,6 @@ where

impl ArcWake for Notifier {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.state.compare_and_swap(POLLING, REPOLL, SeqCst);

let wakers = &mut *arc_self.wakers.lock().unwrap();
if let Some(wakers) = wakers.as_mut() {
for (_key, opt_waker) in wakers {
Expand Down
24 changes: 24 additions & 0 deletions futures/tests/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,27 @@ fn dont_do_unnecessary_clones_on_output() {
assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2);
assert_eq!(block_on(rx).unwrap().0.get(), 2);
}

#[cfg(all(feature = "alloc", feature = "executor"))] // channel:: + executor::
#[test]
fn shared_future_that_wakes_itself_until_pending_is_returned() {
use futures::executor::block_on;
use futures::future::FutureExt;
use std::cell::Cell;
use std::task::Poll;

let proceed = Cell::new(false);
let fut = futures::future::poll_fn(|cx| {
if proceed.get() {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
})
.shared();

// The join future can only complete if the second future gets a chance to run after the first
// has returned pending
assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ()));
}

0 comments on commit f698fdf

Please sign in to comment.