Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Early exit stream selection #1

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 15 additions & 15 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- windows-latest
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
run: rustup update nightly --no-self-update && rustup default nightly
Expand All @@ -53,7 +53,7 @@ jobs:
- aarch64-unknown-linux-gnu
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update nightly && rustup default nightly
- name: Install cross
Expand All @@ -73,7 +73,7 @@ jobs:
- 1.36
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
# cargo does not support for --features/--no-default-features with workspace, so use cargo-hack instead.
Expand Down Expand Up @@ -105,7 +105,7 @@ jobs:
- 1.45
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- name: Install cargo-hack
Expand Down Expand Up @@ -136,7 +136,7 @@ jobs:
- nightly
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- name: Install cargo-hack
Expand All @@ -148,7 +148,7 @@ jobs:
name: cargo build -Z minimal-versions
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update nightly && rustup default nightly
- name: Install cargo-hack
Expand All @@ -170,7 +170,7 @@ jobs:
- thumbv6m-none-eabi
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update nightly && rustup default nightly
- run: rustup target add ${{ matrix.target }}
Expand Down Expand Up @@ -202,7 +202,7 @@ jobs:
name: cargo bench
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update nightly && rustup default nightly
- run: cargo bench --workspace
Expand All @@ -212,7 +212,7 @@ jobs:
name: cargo hack check --feature-powerset
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update nightly && rustup default nightly
- name: Install cargo-hack
Expand All @@ -237,7 +237,7 @@ jobs:
contents: write
pull-requests: write
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update nightly && rustup default nightly
- run: ci/no_atomic_cas.sh
Expand Down Expand Up @@ -270,7 +270,7 @@ jobs:
name: cargo miri test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup toolchain install nightly --component miri && rustup default nightly
- run: cargo miri test --workspace --all-features
Expand All @@ -289,7 +289,7 @@ jobs:
- thread
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update nightly && rustup default nightly
- run: rustup component add rust-src
Expand All @@ -304,7 +304,7 @@ jobs:
name: cargo clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup toolchain install nightly --component clippy && rustup default nightly
- run: cargo clippy --workspace --all-features --all-targets
Expand All @@ -313,7 +313,7 @@ jobs:
name: cargo fmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update stable
- run: cargo fmt --all -- --check
Expand All @@ -322,7 +322,7 @@ jobs:
name: cargo doc
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update nightly && rustup default nightly
- run: RUSTDOCFLAGS="-D warnings --cfg docsrs" cargo doc --workspace --no-deps --all-features
2 changes: 1 addition & 1 deletion futures-executor/src/enter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ impl std::error::Error for EnterError {}
/// executor.
///
/// Executor implementations should call this function before beginning to
/// execute a tasks, and drop the returned [`Enter`](Enter) value after
/// execute a task, and drop the returned [`Enter`](Enter) value after
/// completing task execution:
///
/// ```
Expand Down
94 changes: 50 additions & 44 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,9 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
})
}

fn poll_executor<T, F: FnMut(&mut Context<'_>) -> T>(mut f: F) -> T {
let _enter = enter().expect(
"cannot execute `LocalPool` executor from within \
another executor",
);

CURRENT_THREAD_NOTIFY.with(|thread_notify| {
let waker = waker_ref(thread_notify);
let mut cx = Context::from_waker(&waker);
f(&mut cx)
})
/// Check for a wakeup, but don't consume it.
fn woken() -> bool {
CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::SeqCst))
}

impl LocalPool {
Expand Down Expand Up @@ -212,20 +204,26 @@ impl LocalPool {
/// further use of one of the pool's run or poll methods.
/// Though only one task will be completed, progress may be made on multiple tasks.
pub fn try_run_one(&mut self) -> bool {
poll_executor(|ctx| {
run_executor(|cx| {
loop {
let ret = self.poll_pool_once(ctx);

// return if we have executed a future
if let Poll::Ready(Some(_)) = ret {
return true;
self.drain_incoming();

match self.pool.poll_next_unpin(cx) {
// Success!
Poll::Ready(Some(())) => return Poll::Ready(true),
// The pool was empty.
Poll::Ready(None) => return Poll::Ready(false),
Poll::Pending => (),
}

// if there are no new incoming futures
// then there is no feature that can make progress
// and we can return without having completed a single future
if self.incoming.borrow().is_empty() {
return false;
if !self.incoming.borrow().is_empty() {
// New tasks were spawned; try again.
continue;
} else if woken() {
// The pool yielded to us, but there's more progress to be made.
return Poll::Pending;
} else {
return Poll::Ready(false);
}
}
})
Expand Down Expand Up @@ -257,44 +255,52 @@ impl LocalPool {
/// of the pool's run or poll methods. While the function is running, all tasks
/// in the pool will try to make progress.
pub fn run_until_stalled(&mut self) {
poll_executor(|ctx| {
let _ = self.poll_pool(ctx);
run_executor(|cx| match self.poll_pool(cx) {
// The pool is empty.
Poll::Ready(()) => Poll::Ready(()),
Poll::Pending => {
if woken() {
Poll::Pending
} else {
// We're stalled for now.
Poll::Ready(())
}
}
});
}

// Make maximal progress on the entire pool of spawned task, returning `Ready`
// if the pool is empty and `Pending` if no further progress can be made.
/// Poll `self.pool`, re-filling it with any newly-spawned tasks.
/// Repeat until either the pool is empty, or it returns `Pending`.
///
/// Returns `Ready` if the pool was empty, and `Pending` otherwise.
///
/// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily
/// mean that the pool can't make progress.
fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// state for the FuturesUnordered, which will never be used
loop {
let ret = self.poll_pool_once(cx);
self.drain_incoming();

// we queued up some new tasks; add them and poll again
let pool_ret = self.pool.poll_next_unpin(cx);

// We queued up some new tasks; add them and poll again.
if !self.incoming.borrow().is_empty() {
continue;
}

// no queued tasks; we may be done
match ret {
Poll::Pending => return Poll::Pending,
match pool_ret {
Poll::Ready(Some(())) => continue,
Poll::Ready(None) => return Poll::Ready(()),
_ => {}
Poll::Pending => return Poll::Pending,
}
}
}

// Try make minimal progress on the pool of spawned tasks
fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll<Option<()>> {
// empty the incoming queue of newly-spawned tasks
{
let mut incoming = self.incoming.borrow_mut();
for task in incoming.drain(..) {
self.pool.push(task)
}
/// Empty the incoming queue of newly-spawned tasks.
fn drain_incoming(&mut self) {
let mut incoming = self.incoming.borrow_mut();
for task in incoming.drain(..) {
self.pool.push(task)
}

// try to execute the next ready future
self.pool.poll_next_unpin(cx)
}
}

Expand Down
64 changes: 63 additions & 1 deletion futures-executor/tests/local_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::channel::oneshot;
use futures::executor::LocalPool;
use futures::future::{self, lazy, poll_fn, Future};
use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker};
use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker};
use std::cell::{Cell, RefCell};
use std::pin::Pin;
use std::rc::Rc;
Expand Down Expand Up @@ -435,3 +435,65 @@ fn park_unpark_independence() {

futures::executor::block_on(future)
}

struct SelfWaking {
wakeups_remaining: Rc<RefCell<usize>>,
}

impl Future for SelfWaking {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if *self.wakeups_remaining.borrow() != 0 {
*self.wakeups_remaining.borrow_mut() -= 1;
cx.waker().wake_by_ref();
}

Poll::Pending
}
}

/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
///
/// The issue was that self-waking futures could cause `run_until_stalled`
/// to exit early, even when progress could still be made.
#[test]
fn self_waking_run_until_stalled() {
let wakeups_remaining = Rc::new(RefCell::new(10));

let mut pool = LocalPool::new();
let spawner = pool.spawner();
for _ in 0..3 {
let wakeups_remaining = Rc::clone(&wakeups_remaining);
spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
}

// This should keep polling until there are no more wakeups.
pool.run_until_stalled();

assert_eq!(*wakeups_remaining.borrow(), 0);
}

/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593
///
/// The issue was that self-waking futures could cause `try_run_one`
/// to exit early, even when progress could still be made.
#[test]
fn self_waking_try_run_one() {
let wakeups_remaining = Rc::new(RefCell::new(10));

let mut pool = LocalPool::new();
let spawner = pool.spawner();
for _ in 0..3 {
let wakeups_remaining = Rc::clone(&wakeups_remaining);
spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap();
}

spawner.spawn(future::ready(())).unwrap();

// The `ready` future should complete.
assert!(pool.try_run_one());

// The self-waking futures are each polled once.
assert_eq!(*wakeups_remaining.borrow(), 7);
}
2 changes: 1 addition & 1 deletion futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-utils = "0.1.0"
pin-project-lite = "0.2.4"
pin-project-lite = "0.2.6"

[dev-dependencies]
futures = { path = "../futures", features = ["async-await", "thread-pool"] }
Expand Down
19 changes: 10 additions & 9 deletions futures-util/src/future/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,17 @@ where

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
match a.poll_unpin(cx) {
Poll::Ready(x) => Poll::Ready(Either::Left((x, b))),
Poll::Pending => match b.poll_unpin(cx) {
Poll::Ready(x) => Poll::Ready(Either::Right((x, a))),
Poll::Pending => {
self.inner = Some((a, b));
Poll::Pending
}
},

if let Poll::Ready(val) = a.poll_unpin(cx) {
return Poll::Ready(Either::Left((val, b)));
}

if let Poll::Ready(val) = b.poll_unpin(cx) {
return Poll::Ready(Either::Right((val, a)));
}

self.inner = Some((a, b));
Poll::Pending
}
}

Expand Down