Skip to content

Commit

Permalink
Guard run_executor against use of park/unpark in user-code.
Browse files Browse the repository at this point in the history
If user code that is run as a result of polling the futures
in the `run_executor` loop of `local_pool.rs` makes use of
the park / unpark APIs, execution may stall due to wakeups
getting "lost". This is prevented (and thereby unnecessary
calls to park / unpark avoided) through an additional
`AtomicBool` that is in full control of the code in
`local_pool.rs`.
  • Loading branch information
romanb authored and cramertj committed Dec 26, 2019
1 parent 3c09c69 commit 34bca9d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
34 changes: 31 additions & 3 deletions futures-executor/src/local_pool.rs
Expand Up @@ -10,7 +10,7 @@ use futures_util::stream::StreamExt;
use std::cell::RefCell;
use std::ops::{Deref, DerefMut};
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::thread::{self, Thread};

/// A single-threaded task pool for polling futures to completion.
Expand Down Expand Up @@ -40,18 +40,35 @@ pub struct LocalSpawner {
type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;

pub(crate) struct ThreadNotify {
/// The (single) executor thread.
thread: Thread,
/// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
/// before the next `park()`, which may otherwise happen if the code
/// being executed as part of the future(s) being polled makes use of
/// park / unpark calls of its own, i.e. we cannot assume that no other
/// code uses park / unpark on the executing `thread`.
unparked: AtomicBool,
}

thread_local! {
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
thread: thread::current(),
unparked: AtomicBool::new(false),
});
}

impl ArcWake for ThreadNotify {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.thread.unpark();
// Make sure the wakeup is remembered until the next `park()`.
let unparked = arc_self.unparked.swap(true, Ordering::Relaxed);
if !unparked {
// If the thread has not been unparked yet, it must be done
// now. If it was actually parked, it will run again,
// otherwise the token made available by `unpark`
// may be consumed before reaching `park()`, but `unparked`
// ensures it is not forgotten.
arc_self.thread.unpark();
}
}
}

Expand All @@ -70,7 +87,18 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
if let Poll::Ready(t) = f(&mut cx) {
return t;
}
thread::park();
// Consume the wakeup that occurred while executing `f`, if any.
let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
if !unparked {
// No wakeup occurred. It may occur now, right before parking,
// but in that case the token made available by `unpark()`
// is guaranteed to still be available and `park()` is a no-op.
thread::park();
// When the thread is unparked, `unparked` will have been set
// and needs to be unset before the next call to `f` to avoid
// a redundant loop iteration.
thread_notify.unparked.store(false, Ordering::Release);
}
}
})
}
Expand Down
37 changes: 36 additions & 1 deletion futures-executor/tests/local_pool.rs
@@ -1,10 +1,14 @@
use futures::channel::oneshot;
use futures::executor::LocalPool;
use futures::future::{Future, lazy, poll_fn};
use futures::future::{self, Future, lazy, poll_fn};
use futures::task::{Context, Poll, Spawn, LocalSpawn, Waker};
use std::cell::{Cell, RefCell};
use std::pin::Pin;
use std::rc::Rc;
use std::thread;
use std::time::Duration;
use std::sync::Arc;
use std::sync::atomic::{Ordering, AtomicBool};

struct Pending(Rc<()>);

Expand Down Expand Up @@ -357,3 +361,34 @@ fn tasks_are_scheduled_fairly() {

pool.run();
}

// Tests that the use of park/unpark in user-code has no
// effect on the expected behaviour of the executor.
#[test]
fn park_unpark_independence() {
let mut done = false;

let future = future::poll_fn(move |cx| {
if done {
return Poll::Ready(())
}
done = true;
cx.waker().clone().wake(); // (*)
// some user-code that temporarily parks the thread
let test = thread::current();
let latch = Arc::new(AtomicBool::new(false));
let signal = latch.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
signal.store(true, Ordering::SeqCst);
test.unpark()
});
while !latch.load(Ordering::Relaxed) {
thread::park();
}
Poll::Pending // Expect to be called again due to (*).
});

futures::executor::block_on(future)
}

0 comments on commit 34bca9d

Please sign in to comment.