diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index 7c3a1477e1..b089a8094f 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -10,7 +10,7 @@ use futures_util::stream::StreamExt; use std::cell::RefCell; use std::ops::{Deref, DerefMut}; use std::rc::{Rc, Weak}; -use std::sync::Arc; +use std::sync::{Arc, atomic::{AtomicBool, Ordering}}; use std::thread::{self, Thread}; /// A single-threaded task pool for polling futures to completion. @@ -40,18 +40,35 @@ pub struct LocalSpawner { type Incoming = RefCell>>; pub(crate) struct ThreadNotify { + /// The (single) executor thread. thread: Thread, + /// A flag to ensure a wakeup (i.e. `unpark()`) is not "forgotten" + /// before the next `park()`, which may otherwise happen if the code + /// being executed as part of the future(s) being polled makes use of + /// park / unpark calls of its own, i.e. we cannot assume that no other + /// code uses park / unpark on the executing `thread`. + unparked: AtomicBool, } thread_local! { static CURRENT_THREAD_NOTIFY: Arc = Arc::new(ThreadNotify { thread: thread::current(), + unparked: AtomicBool::new(false), }); } impl ArcWake for ThreadNotify { fn wake_by_ref(arc_self: &Arc) { - arc_self.thread.unpark(); + // Make sure the wakeup is remembered until the next `park()`. + let unparked = arc_self.unparked.swap(true, Ordering::Relaxed); + if !unparked { + // If the thread has not been unparked yet, it must be done + // now. If it was actually parked, it will run again, + // otherwise the token made available by `unpark` + // may be consumed before reaching `park()`, but `unparked` + // ensures it is not forgotten. + arc_self.thread.unpark(); + } } } @@ -70,7 +87,18 @@ fn run_executor) -> Poll>(mut f: F) -> T { if let Poll::Ready(t) = f(&mut cx) { return t; } - thread::park(); + // Consume the wakeup that occurred while executing `f`, if any. + let unparked = thread_notify.unparked.swap(false, Ordering::Acquire); + if !unparked { + // No wakeup occurred. It may occur now, right before parking, + // but in that case the token made available by `unpark()` + // is guaranteed to still be available and `park()` is a no-op. + thread::park(); + // When the thread is unparked, `unparked` will have been set + // and needs to be unset before the next call to `f` to avoid + // a redundant loop iteration. + thread_notify.unparked.store(false, Ordering::Release); + } } }) } diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index b6a5678388..f3b4e2692a 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -1,10 +1,14 @@ use futures::channel::oneshot; use futures::executor::LocalPool; -use futures::future::{Future, lazy, poll_fn}; +use futures::future::{self, Future, lazy, poll_fn}; use futures::task::{Context, Poll, Spawn, LocalSpawn, Waker}; use std::cell::{Cell, RefCell}; use std::pin::Pin; use std::rc::Rc; +use std::thread; +use std::time::Duration; +use std::sync::Arc; +use std::sync::atomic::{Ordering, AtomicBool}; struct Pending(Rc<()>); @@ -357,3 +361,34 @@ fn tasks_are_scheduled_fairly() { pool.run(); } + +// Tests that the use of park/unpark in user-code has no +// effect on the expected behaviour of the executor. +#[test] +fn park_unpark_independence() { + let mut done = false; + + let future = future::poll_fn(move |cx| { + if done { + return Poll::Ready(()) + } + done = true; + cx.waker().clone().wake(); // (*) + // some user-code that temporarily parks the thread + let test = thread::current(); + let latch = Arc::new(AtomicBool::new(false)); + let signal = latch.clone(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(10)); + signal.store(true, Ordering::SeqCst); + test.unpark() + }); + while !latch.load(Ordering::Relaxed) { + thread::park(); + } + Poll::Pending // Expect to be called again due to (*). + }); + + futures::executor::block_on(future) +} +