From 991effa61b7ae5f08d3a924fee6145ef8c3a6b1f Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 8 Sep 2020 00:19:49 -0400 Subject: [PATCH 1/5] Updates to parker - No longer possibility for spurious wake - Now reports unpark reason (timeout or unpark())Updates to parker --- crossbeam-utils/src/sync/mod.rs | 2 +- crossbeam-utils/src/sync/parker.rs | 123 +++++++++++++++++------------ crossbeam-utils/tests/parker.rs | 17 +++- 3 files changed, 87 insertions(+), 55 deletions(-) diff --git a/crossbeam-utils/src/sync/mod.rs b/crossbeam-utils/src/sync/mod.rs index fd400d70e..2483e3ee4 100644 --- a/crossbeam-utils/src/sync/mod.rs +++ b/crossbeam-utils/src/sync/mod.rs @@ -8,6 +8,6 @@ mod parker; mod sharded_lock; mod wait_group; -pub use self::parker::{Parker, Unparker}; +pub use self::parker::{Parker, UnparkReason, Unparker}; pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; pub use self::wait_group::WaitGroup; diff --git a/crossbeam-utils/src/sync/parker.rs b/crossbeam-utils/src/sync/parker.rs index fc13d2e96..e7deb1c99 100644 --- a/crossbeam-utils/src/sync/parker.rs +++ b/crossbeam-utils/src/sync/parker.rs @@ -1,20 +1,19 @@ -use std::fmt; use std::marker::PhantomData; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Condvar, Mutex}; use std::time::Duration; +use std::{fmt, time::Instant}; /// A thread parking primitive. /// /// Conceptually, each `Parker` has an associated token which is initially not present: /// /// * The [`park`] method blocks the current thread unless or until the token is available, at -/// which point it automatically consumes the token. It may also return *spuriously*, without -/// consuming the token. +/// which point it automatically consumes the token. /// -/// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum -/// time. +/// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for +/// a specified maximum time. /// /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the /// token is initially absent, [`unpark`] followed by [`park`] will result in the second call @@ -43,13 +42,13 @@ use std::time::Duration; /// u.unpark(); /// }); /// -/// // Wakes up when `u.unpark()` provides the token, but may also wake up -/// // spuriously before that without consuming the token. +/// // Wakes up when `u.unpark()` provides the token. /// p.park(); /// ``` /// /// [`park`]: Parker::park /// [`park_timeout`]: Parker::park_timeout +/// [`park_deadline`]: Parker::park_deadline /// [`unpark`]: Unparker::unpark pub struct Parker { unparker: Unparker, @@ -90,9 +89,6 @@ impl Parker { /// Blocks the current thread until the token is made available. /// - /// A call to `park` may wake up spuriously without consuming the token, and callers should be - /// prepared for this possibility. - /// /// # Examples /// /// ``` @@ -113,9 +109,6 @@ impl Parker { /// Blocks the current thread until the token is made available, but only for a limited time. /// - /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers - /// should be prepared for this possibility. - /// /// # Examples /// /// ``` @@ -127,8 +120,26 @@ impl Parker { /// // Waits for the token to become available, but will not wait longer than 500 ms. /// p.park_timeout(Duration::from_millis(500)); /// ``` - pub fn park_timeout(&self, timeout: Duration) { - self.unparker.inner.park(Some(timeout)); + pub fn park_timeout(&self, timeout: Duration) -> UnparkReason { + self.park_deadline(Instant::now() + timeout) + } + + /// Blocks the current thread until the token is made available, or until a certain deadline. + /// + /// # Examples + /// + /// ``` + /// use std::time::{Duration, Instant}; + /// use crossbeam_utils::sync::Parker; + /// + /// let p = Parker::new(); + /// let deadline = Instant::now() + Duration::from_millis(500); + /// + /// // Waits for the token to become available, but will not wait longer than 500 ms. + /// p.park_deadline(deadline); + /// ``` + pub fn park_deadline(&self, deadline: Instant) -> UnparkReason { + self.unparker.inner.park(Some(deadline)) } /// Returns a reference to an associated [`Unparker`]. @@ -227,8 +238,7 @@ impl Unparker { /// u.unpark(); /// }); /// - /// // Wakes up when `u.unpark()` provides the token, but may also wake up - /// // spuriously before that without consuming the token. + /// // Wakes up when `u.unpark()` provides the token. /// p.park(); /// ``` /// @@ -291,6 +301,18 @@ impl Clone for Unparker { } } +/// An enum that reports whether a `Parker::park_timeout` or +/// `Parker::park_deadline` returned because another thread called `unpark` or +/// because of a timeout. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum UnparkReason { + /// The park method returned due to a call to `unpark`. + Unparked, + + /// The park method returned due to a timeout. + Timeout, +} + const EMPTY: usize = 0; const PARKED: usize = 1; const NOTIFIED: usize = 2; @@ -302,20 +324,20 @@ struct Inner { } impl Inner { - fn park(&self, timeout: Option) { + fn park(&self, deadline: Option) -> UnparkReason { // If we were previously notified then we consume this notification and return quickly. if self .state .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) .is_ok() { - return; + return UnparkReason::Unparked; } // If the timeout is zero, then there is no need to actually block. - if let Some(ref dur) = timeout { - if *dur == Duration::from_millis(0) { - return; + if let Some(deadline) = deadline { + if deadline <= Instant::now() { + return UnparkReason::Timeout; } } @@ -333,41 +355,42 @@ impl Inner { // do that we must read from the write it made to `state`. let old = self.state.swap(EMPTY, SeqCst); assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); - return; + return UnparkReason::Unparked; } Err(n) => panic!("inconsistent park_timeout state: {}", n), } - match timeout { - None => { - loop { - // Block the current thread on the conditional variable. - m = self.cvar.wait(m).unwrap(); - - if self - .state - .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) - .is_ok() - { - // got a notification - return; + loop { + // Block the current thread on the conditional variable. + m = match deadline { + None => self.cvar.wait(m).unwrap(), + Some(deadline) => match deadline.checked_duration_since(Instant::now()) { + // We could check for a timeout here, but in the case that a timeout and an + // unpark arrive simultaneously, we prefer to report the former. + Some(duration) if duration > Duration::from_secs(0) => { + self.cvar.wait_timeout(m, duration).unwrap().0 } - // spurious wakeup, go back to sleep - } - } - Some(timeout) => { - // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a - // notification we just want to unconditionally set `state` back to `EMPTY`, either - // consuming a notification or un-flagging ourselves as parked. - let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap(); - - match self.state.swap(EMPTY, SeqCst) { - NOTIFIED => {} // got a notification - PARKED => {} // no notification - n => panic!("inconsistent park_timeout state: {}", n), - } + // We've timed out; swap out the state back to empty on our way out + _ => match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => return UnparkReason::Unparked, // got a notification + PARKED => return UnparkReason::Timeout, // no notification + n => panic!("inconsistent park_timeout state: {}", n), + }, + }, + }; + + if self + .state + .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) + .is_ok() + { + // got a notification + return UnparkReason::Unparked; } + + // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught + // in the branch above, when we discover the deadline is in the past } } diff --git a/crossbeam-utils/tests/parker.rs b/crossbeam-utils/tests/parker.rs index f657eb1cf..6a6d243cd 100644 --- a/crossbeam-utils/tests/parker.rs +++ b/crossbeam-utils/tests/parker.rs @@ -2,7 +2,7 @@ use std::thread::sleep; use std::time::Duration; use std::u32; -use crossbeam_utils::sync::Parker; +use crossbeam_utils::sync::{Parker, UnparkReason}; use crossbeam_utils::thread; #[test] @@ -10,7 +10,10 @@ fn park_timeout_unpark_before() { let p = Parker::new(); for _ in 0..10 { p.unparker().unpark(); - p.park_timeout(Duration::from_millis(u32::MAX as u64)); + assert_eq!( + p.park_timeout(Duration::from_millis(u32::MAX as u64)), + UnparkReason::Unparked, + ); } } @@ -18,7 +21,10 @@ fn park_timeout_unpark_before() { fn park_timeout_unpark_not_called() { let p = Parker::new(); for _ in 0..10 { - p.park_timeout(Duration::from_millis(10)); + assert_eq!( + p.park_timeout(Duration::from_millis(10)), + UnparkReason::Timeout, + ); } } @@ -34,7 +40,10 @@ fn park_timeout_unpark_called_other_thread() { u.unpark(); }); - p.park_timeout(Duration::from_millis(u32::MAX as u64)); + assert_eq!( + p.park_timeout(Duration::from_millis(u32::MAX as u64)), + UnparkReason::Unparked, + ); }) .unwrap(); } From 898247636980c9b6f848f606435cd8ea7b6819d1 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 8 Sep 2020 14:59:03 -0400 Subject: [PATCH 2/5] Fixed weird import order --- crossbeam-utils/src/sync/parker.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crossbeam-utils/src/sync/parker.rs b/crossbeam-utils/src/sync/parker.rs index e7deb1c99..587be253d 100644 --- a/crossbeam-utils/src/sync/parker.rs +++ b/crossbeam-utils/src/sync/parker.rs @@ -1,9 +1,9 @@ +use std::fmt; use std::marker::PhantomData; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::SeqCst; use std::sync::{Arc, Condvar, Mutex}; -use std::time::Duration; -use std::{fmt, time::Instant}; +use std::time::{Duration, Instant}; /// A thread parking primitive. /// From 78f10b24b975af9ecb72075c8eac822b391ac591 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 8 Sep 2020 16:04:55 -0400 Subject: [PATCH 3/5] Update comment --- crossbeam-utils/src/sync/parker.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crossbeam-utils/src/sync/parker.rs b/crossbeam-utils/src/sync/parker.rs index 587be253d..045a0a706 100644 --- a/crossbeam-utils/src/sync/parker.rs +++ b/crossbeam-utils/src/sync/parker.rs @@ -365,8 +365,9 @@ impl Inner { m = match deadline { None => self.cvar.wait(m).unwrap(), Some(deadline) => match deadline.checked_duration_since(Instant::now()) { - // We could check for a timeout here, but in the case that a timeout and an - // unpark arrive simultaneously, we prefer to report the former. + // We could check for a timeout here, in the return value of wait_timeout, + // but in the case that a timeout and an unpark arrive simultaneously, we + // prefer to report the former. Some(duration) if duration > Duration::from_secs(0) => { self.cvar.wait_timeout(m, duration).unwrap().0 } From 8d7df2137235a2cdb8550f28f06cfbf25c5733d7 Mon Sep 17 00:00:00 2001 From: Nathan West Date: Tue, 8 Sep 2020 17:36:52 -0400 Subject: [PATCH 4/5] Support earlier minimum rust version Remove checked_duration_since in favor of arithmetic and manual checks --- crossbeam-utils/src/sync/parker.rs | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/crossbeam-utils/src/sync/parker.rs b/crossbeam-utils/src/sync/parker.rs index 045a0a706..3c78c0a43 100644 --- a/crossbeam-utils/src/sync/parker.rs +++ b/crossbeam-utils/src/sync/parker.rs @@ -364,21 +364,22 @@ impl Inner { // Block the current thread on the conditional variable. m = match deadline { None => self.cvar.wait(m).unwrap(), - Some(deadline) => match deadline.checked_duration_since(Instant::now()) { - // We could check for a timeout here, in the return value of wait_timeout, - // but in the case that a timeout and an unpark arrive simultaneously, we - // prefer to report the former. - Some(duration) if duration > Duration::from_secs(0) => { - self.cvar.wait_timeout(m, duration).unwrap().0 + Some(deadline) => { + let now = Instant::now(); + if now < deadline { + // We could check for a timeout here, in the return value of wait_timeout, + // but in the case that a timeout and an unpark arrive simultaneously, we + // prefer to report the former. + self.cvar.wait_timeout(m, deadline - now).unwrap().0 + } else { + // We've timed out; swap out the state back to empty on our way out + return match self.state.swap(EMPTY, SeqCst) { + NOTIFIED => UnparkReason::Unparked, // got a notification + PARKED => UnparkReason::Timeout, // no notification + n => panic!("inconsistent park_timeout state: {}", n), + }; } - - // We've timed out; swap out the state back to empty on our way out - _ => match self.state.swap(EMPTY, SeqCst) { - NOTIFIED => return UnparkReason::Unparked, // got a notification - PARKED => return UnparkReason::Timeout, // no notification - n => panic!("inconsistent park_timeout state: {}", n), - }, - }, + } }; if self From 4c0a1061d5485beba621517b59883530de30ae4b Mon Sep 17 00:00:00 2001 From: Nathan West Date: Sat, 28 Nov 2020 18:18:18 -0500 Subject: [PATCH 5/5] Remove `UnparkReason` to preserve compatibility. `UnparkReason` implementation preserved in Lucretiel/crossbeam/unpark-reason --- crossbeam-utils/src/sync/mod.rs | 2 +- crossbeam-utils/src/sync/parker.rs | 31 +++++++++--------------------- crossbeam-utils/tests/parker.rs | 17 ++++------------ 3 files changed, 14 insertions(+), 36 deletions(-) diff --git a/crossbeam-utils/src/sync/mod.rs b/crossbeam-utils/src/sync/mod.rs index 2483e3ee4..fd400d70e 100644 --- a/crossbeam-utils/src/sync/mod.rs +++ b/crossbeam-utils/src/sync/mod.rs @@ -8,6 +8,6 @@ mod parker; mod sharded_lock; mod wait_group; -pub use self::parker::{Parker, UnparkReason, Unparker}; +pub use self::parker::{Parker, Unparker}; pub use self::sharded_lock::{ShardedLock, ShardedLockReadGuard, ShardedLockWriteGuard}; pub use self::wait_group::WaitGroup; diff --git a/crossbeam-utils/src/sync/parker.rs b/crossbeam-utils/src/sync/parker.rs index 3c78c0a43..bf9d6f347 100644 --- a/crossbeam-utils/src/sync/parker.rs +++ b/crossbeam-utils/src/sync/parker.rs @@ -120,7 +120,7 @@ impl Parker { /// // Waits for the token to become available, but will not wait longer than 500 ms. /// p.park_timeout(Duration::from_millis(500)); /// ``` - pub fn park_timeout(&self, timeout: Duration) -> UnparkReason { + pub fn park_timeout(&self, timeout: Duration) { self.park_deadline(Instant::now() + timeout) } @@ -138,7 +138,7 @@ impl Parker { /// // Waits for the token to become available, but will not wait longer than 500 ms. /// p.park_deadline(deadline); /// ``` - pub fn park_deadline(&self, deadline: Instant) -> UnparkReason { + pub fn park_deadline(&self, deadline: Instant) { self.unparker.inner.park(Some(deadline)) } @@ -301,18 +301,6 @@ impl Clone for Unparker { } } -/// An enum that reports whether a `Parker::park_timeout` or -/// `Parker::park_deadline` returned because another thread called `unpark` or -/// because of a timeout. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum UnparkReason { - /// The park method returned due to a call to `unpark`. - Unparked, - - /// The park method returned due to a timeout. - Timeout, -} - const EMPTY: usize = 0; const PARKED: usize = 1; const NOTIFIED: usize = 2; @@ -324,20 +312,20 @@ struct Inner { } impl Inner { - fn park(&self, deadline: Option) -> UnparkReason { + fn park(&self, deadline: Option) { // If we were previously notified then we consume this notification and return quickly. if self .state .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) .is_ok() { - return UnparkReason::Unparked; + return; } // If the timeout is zero, then there is no need to actually block. if let Some(deadline) = deadline { if deadline <= Instant::now() { - return UnparkReason::Timeout; + return; } } @@ -355,7 +343,7 @@ impl Inner { // do that we must read from the write it made to `state`. let old = self.state.swap(EMPTY, SeqCst); assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); - return UnparkReason::Unparked; + return; } Err(n) => panic!("inconsistent park_timeout state: {}", n), } @@ -373,9 +361,8 @@ impl Inner { self.cvar.wait_timeout(m, deadline - now).unwrap().0 } else { // We've timed out; swap out the state back to empty on our way out - return match self.state.swap(EMPTY, SeqCst) { - NOTIFIED => UnparkReason::Unparked, // got a notification - PARKED => UnparkReason::Timeout, // no notification + match self.state.swap(EMPTY, SeqCst) { + NOTIFIED | PARKED => return, n => panic!("inconsistent park_timeout state: {}", n), }; } @@ -388,7 +375,7 @@ impl Inner { .is_ok() { // got a notification - return UnparkReason::Unparked; + return; } // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught diff --git a/crossbeam-utils/tests/parker.rs b/crossbeam-utils/tests/parker.rs index 6a6d243cd..2bf9c37d4 100644 --- a/crossbeam-utils/tests/parker.rs +++ b/crossbeam-utils/tests/parker.rs @@ -2,7 +2,7 @@ use std::thread::sleep; use std::time::Duration; use std::u32; -use crossbeam_utils::sync::{Parker, UnparkReason}; +use crossbeam_utils::sync::Parker; use crossbeam_utils::thread; #[test] @@ -10,10 +10,7 @@ fn park_timeout_unpark_before() { let p = Parker::new(); for _ in 0..10 { p.unparker().unpark(); - assert_eq!( - p.park_timeout(Duration::from_millis(u32::MAX as u64)), - UnparkReason::Unparked, - ); + p.park_timeout(Duration::from_millis(u32::MAX as u64)); } } @@ -21,10 +18,7 @@ fn park_timeout_unpark_before() { fn park_timeout_unpark_not_called() { let p = Parker::new(); for _ in 0..10 { - assert_eq!( - p.park_timeout(Duration::from_millis(10)), - UnparkReason::Timeout, - ); + p.park_timeout(Duration::from_millis(10)) } } @@ -40,10 +34,7 @@ fn park_timeout_unpark_called_other_thread() { u.unpark(); }); - assert_eq!( - p.park_timeout(Duration::from_millis(u32::MAX as u64)), - UnparkReason::Unparked, - ); + p.park_timeout(Duration::from_millis(u32::MAX as u64)) }) .unwrap(); }