Skip to content

Commit

Permalink
process subsequent futures when partially progressing a pool
Browse files Browse the repository at this point in the history
fixes #1737
  • Loading branch information
SrTobi authored and cramertj committed Nov 5, 2019
1 parent ac81046 commit 08e4733
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 16 deletions.
32 changes: 16 additions & 16 deletions futures-executor/src/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,20 @@ impl LocalPool {
/// Though only one task will be completed, progress may be made on multiple tasks.
pub fn try_run_one(&mut self) -> bool {
poll_executor(|ctx| {
let ret = self.poll_pool_once(ctx);
loop {
let ret = self.poll_pool_once(ctx);

// return if we really have executed a future
match ret {
Poll::Ready(Some(_)) => true,
_ => false
// return if we have executed a future
if let Poll::Ready(Some(_)) = ret {
return true;
}

// if there are no new incoming futures
// then there is no feature that can make progress
// and we can return without having completed a single future
if self.incoming.borrow().is_empty() {
return false;
}
}
})
}
Expand Down Expand Up @@ -220,17 +228,9 @@ impl LocalPool {
/// of the pool's run or poll methods. While the function is running, all tasks
/// in the pool will try to make progress.
pub fn run_until_stalled(&mut self) {
poll_executor(|ctx| {
loop {
let result = self.poll_pool_once(ctx);

// if there are no more ready futures exit
match result {
Poll::Pending | Poll::Ready(None) => return,
_ => continue
}
}
})
poll_executor(|ctx| {
let _ = self.poll_pool(ctx);
});
}

// Make maximal progress on the entire pool of spawned task, returning `Ready`
Expand Down
46 changes: 46 additions & 0 deletions futures-executor/tests/local_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,29 @@ fn try_run_one_returns_on_no_progress() {
assert_eq!(cnt.get(), ITER);
}

#[test]
fn try_run_one_runs_sub_futures() {
let mut pool = LocalPool::new();
let mut spawn = pool.spawner();
let cnt = Rc::new(Cell::new(0));

let mut inner_spawner = spawn.clone();
let cnt1 = cnt.clone();
spawn.spawn_local_obj(Box::pin(poll_fn(move |_| {
cnt1.set(cnt1.get() + 1);

let cnt2 = cnt1.clone();
inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{
cnt2.set(cnt2.get() + 1)
})).into()).unwrap();

Poll::Pending
})).into()).unwrap();

pool.try_run_one();
assert_eq!(cnt.get(), 2);
}

#[test]
fn run_until_stalled_returns_if_empty() {
let mut pool = LocalPool::new();
Expand All @@ -203,6 +226,29 @@ fn run_until_stalled_returns_multiple_times() {
assert_eq!(cnt.get(), 2);
}

#[test]
fn run_until_stalled_runs_spawned_sub_futures() {
let mut pool = LocalPool::new();
let mut spawn = pool.spawner();
let cnt = Rc::new(Cell::new(0));

let mut inner_spawner = spawn.clone();
let cnt1 = cnt.clone();
spawn.spawn_local_obj(Box::pin(poll_fn(move |_| {
cnt1.set(cnt1.get() + 1);

let cnt2 = cnt1.clone();
inner_spawner.spawn_local_obj(Box::pin(lazy(move |_|{
cnt2.set(cnt2.get() + 1)
})).into()).unwrap();

Poll::Pending
})).into()).unwrap();

pool.run_until_stalled();
assert_eq!(cnt.get(), 2);
}

#[test]
fn run_until_stalled_executes_all_ready() {
const ITER: usize = 200;
Expand Down

0 comments on commit 08e4733

Please sign in to comment.