Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard run_executor of local_pool.rs against use of park / unpark in user-code. #2010

Merged
merged 2 commits into from Dec 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
34 changes: 31 additions & 3 deletions futures-executor/src/local_pool.rs
Expand Up @@ -10,7 +10,7 @@ use futures_util::stream::StreamExt;
use std::cell::RefCell;
use std::ops::{Deref, DerefMut};
use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::sync::{Arc, atomic::{AtomicBool, Ordering}};
use std::thread::{self, Thread};

/// A single-threaded task pool for polling futures to completion.
Expand Down Expand Up @@ -40,18 +40,35 @@ pub struct LocalSpawner {
type Incoming = RefCell<Vec<LocalFutureObj<'static, ()>>>;

pub(crate) struct ThreadNotify {
/// The (single) executor thread.
thread: Thread,
/// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten"
/// before the next `park()`, which may otherwise happen if the code
/// being executed as part of the future(s) being polled makes use of
/// park / unpark calls of its own, i.e. we cannot assume that no other
/// code uses park / unpark on the executing `thread`.
unparked: AtomicBool,
}

thread_local! {
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
thread: thread::current(),
unparked: AtomicBool::new(false),
});
}

impl ArcWake for ThreadNotify {
fn wake_by_ref(arc_self: &Arc<Self>) {
arc_self.thread.unpark();
// Make sure the wakeup is remembered until the next `park()`.
let unparked = arc_self.unparked.swap(true, Ordering::Relaxed);
if !unparked {
// If the thread has not been unparked yet, it must be done
// now. If it was actually parked, it will run again,
// otherwise the token made available by `unpark`
// may be consumed before reaching `park()`, but `unparked`
// ensures it is not forgotten.
arc_self.thread.unpark();
}
}
}

Expand All @@ -70,7 +87,18 @@ fn run_executor<T, F: FnMut(&mut Context<'_>) -> Poll<T>>(mut f: F) -> T {
if let Poll::Ready(t) = f(&mut cx) {
return t;
}
thread::park();
// Consume the wakeup that occurred while executing `f`, if any.
let unparked = thread_notify.unparked.swap(false, Ordering::Acquire);
if !unparked {
// No wakeup occurred. It may occur now, right before parking,
// but in that case the token made available by `unpark()`
// is guaranteed to still be available and `park()` is a no-op.
thread::park();
// When the thread is unparked, `unparked` will have been set
// and needs to be unset before the next call to `f` to avoid
// a redundant loop iteration.
thread_notify.unparked.store(false, Ordering::Release);
}
}
})
}
Expand Down
37 changes: 36 additions & 1 deletion futures-executor/tests/local_pool.rs
@@ -1,10 +1,14 @@
use futures::channel::oneshot;
use futures::executor::LocalPool;
use futures::future::{Future, lazy, poll_fn};
use futures::future::{self, Future, lazy, poll_fn};
use futures::task::{Context, Poll, Spawn, LocalSpawn, Waker};
use std::cell::{Cell, RefCell};
use std::pin::Pin;
use std::rc::Rc;
use std::thread;
use std::time::Duration;
use std::sync::Arc;
use std::sync::atomic::{Ordering, AtomicBool};

struct Pending(Rc<()>);

Expand Down Expand Up @@ -357,3 +361,34 @@ fn tasks_are_scheduled_fairly() {

pool.run();
}

// Tests that the use of park/unpark in user-code has no
// effect on the expected behaviour of the executor.
#[test]
fn park_unpark_independence() {
let mut done = false;

let future = future::poll_fn(move |cx| {
if done {
return Poll::Ready(())
}
done = true;
cx.waker().clone().wake(); // (*)
// some user-code that temporarily parks the thread
let test = thread::current();
let latch = Arc::new(AtomicBool::new(false));
let signal = latch.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
signal.store(true, Ordering::SeqCst);
test.unpark()
});
while !latch.load(Ordering::Relaxed) {
thread::park();
}
Poll::Pending // Expect to be called again due to (*).
});

futures::executor::block_on(future)
}