From eeb315a880d22082ea633fe45c0525a9f7ba9c9c Mon Sep 17 00:00:00 2001 From: Jeehoon Kang Date: Tue, 23 Apr 2019 15:51:25 +0900 Subject: [PATCH 1/2] Implement wide sequence lock --- crossbeam-utils/src/atomic/atomic_cell.rs | 94 ++--------------- crossbeam-utils/src/atomic/mod.rs | 11 ++ crossbeam-utils/src/atomic/seq_lock.rs | 90 +++++++++++++++++ crossbeam-utils/src/atomic/seq_lock_wide.rs | 106 ++++++++++++++++++++ 4 files changed, 213 insertions(+), 88 deletions(-) create mode 100644 crossbeam-utils/src/atomic/seq_lock.rs create mode 100644 crossbeam-utils/src/atomic/seq_lock_wide.rs diff --git a/crossbeam-utils/src/atomic/atomic_cell.rs b/crossbeam-utils/src/atomic/atomic_cell.rs index f3118ecac..545cf9202 100644 --- a/crossbeam-utils/src/atomic/atomic_cell.rs +++ b/crossbeam-utils/src/atomic/atomic_cell.rs @@ -2,12 +2,12 @@ use core::cell::UnsafeCell; use core::fmt; use core::mem; use core::ptr; -use core::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; +use core::sync::atomic::{self, AtomicBool, Ordering}; #[cfg(feature = "std")] use std::panic::{RefUnwindSafe, UnwindSafe}; -use Backoff; +use super::seq_lock::SeqLock; /// A thread-safe mutable memory location. /// @@ -631,87 +631,6 @@ fn can_transmute() -> bool { mem::size_of::() == mem::size_of::() && mem::align_of::() >= mem::align_of::() } -/// A simple stamped lock. -struct Lock { - /// The current state of the lock. - /// - /// All bits except the least significant one hold the current stamp. When locked, the state - /// equals 1 and doesn't contain a valid stamp. - state: AtomicUsize, -} - -impl Lock { - /// If not locked, returns the current stamp. - /// - /// This method should be called before optimistic reads. - #[inline] - fn optimistic_read(&self) -> Option { - let state = self.state.load(Ordering::Acquire); - if state == 1 { - None - } else { - Some(state) - } - } - - /// Returns `true` if the current stamp is equal to `stamp`. - /// - /// This method should be called after optimistic reads to check whether they are valid. The - /// argument `stamp` should correspond to the one returned by method `optimistic_read`. - #[inline] - fn validate_read(&self, stamp: usize) -> bool { - atomic::fence(Ordering::Acquire); - self.state.load(Ordering::Relaxed) == stamp - } - - /// Grabs the lock for writing. - #[inline] - fn write(&'static self) -> WriteGuard { - let backoff = Backoff::new(); - loop { - let previous = self.state.swap(1, Ordering::Acquire); - - if previous != 1 { - atomic::fence(Ordering::Release); - - return WriteGuard { - lock: self, - state: previous, - }; - } - - backoff.snooze(); - } - } -} - -/// A RAII guard that releases the lock and increments the stamp when dropped. -struct WriteGuard { - /// The parent lock. - lock: &'static Lock, - - /// The stamp before locking. - state: usize, -} - -impl WriteGuard { - /// Releases the lock without incrementing the stamp. - #[inline] - fn abort(self) { - self.lock.state.store(self.state, Ordering::Release); - } -} - -impl Drop for WriteGuard { - #[inline] - fn drop(&mut self) { - // Release the lock and increment the stamp. - self.lock - .state - .store(self.state.wrapping_add(2), Ordering::Release); - } -} - /// Returns a reference to the global lock associated with the `AtomicCell` at address `addr`. /// /// This function is used to protect atomic data which doesn't fit into any of the primitive atomic @@ -722,7 +641,7 @@ impl Drop for WriteGuard { /// scalability. #[inline] #[must_use] -fn lock(addr: usize) -> &'static Lock { +fn lock(addr: usize) -> &'static SeqLock { // The number of locks is a prime number because we want to make sure `addr % LEN` gets // dispersed across all locks. // @@ -747,10 +666,9 @@ fn lock(addr: usize) -> &'static Lock { // In order to protect from such cases, we simply choose a large prime number for `LEN`. const LEN: usize = 97; - const L: Lock = Lock { - state: AtomicUsize::new(0), - }; - static LOCKS: [Lock; LEN] = [ + const L: SeqLock = SeqLock::new(); + + static LOCKS: [SeqLock; LEN] = [ L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, diff --git a/crossbeam-utils/src/atomic/mod.rs b/crossbeam-utils/src/atomic/mod.rs index 420599395..5f09d0832 100644 --- a/crossbeam-utils/src/atomic/mod.rs +++ b/crossbeam-utils/src/atomic/mod.rs @@ -1,5 +1,16 @@ //! Atomic types. +cfg_if! { + // TODO(jeehoonkang): want to express `target_pointer_width >= "64"`, but it's not expressible + // in Rust for the time being. + if #[cfg(target_pointer_width = "64")] { + mod seq_lock; + } else { + #[path = "seq_lock_wide.rs"] + mod seq_lock; + } +} + mod atomic_cell; mod consume; diff --git a/crossbeam-utils/src/atomic/seq_lock.rs b/crossbeam-utils/src/atomic/seq_lock.rs new file mode 100644 index 000000000..674cc0358 --- /dev/null +++ b/crossbeam-utils/src/atomic/seq_lock.rs @@ -0,0 +1,90 @@ +use core::sync::atomic::{self, AtomicUsize, Ordering}; + +use Backoff; + +/// A simple stamped lock. +pub struct SeqLock { + /// The current state of the lock. + /// + /// All bits except the least significant one hold the current stamp. When locked, the state + /// equals 1 and doesn't contain a valid stamp. + state: AtomicUsize, +} + +impl SeqLock { + pub const fn new() -> Self { + Self { + state: AtomicUsize::new(0), + } + } + + /// If not locked, returns the current stamp. + /// + /// This method should be called before optimistic reads. + #[inline] + pub fn optimistic_read(&self) -> Option { + let state = self.state.load(Ordering::Acquire); + if state == 1 { + None + } else { + Some(state) + } + } + + /// Returns `true` if the current stamp is equal to `stamp`. + /// + /// This method should be called after optimistic reads to check whether they are valid. The + /// argument `stamp` should correspond to the one returned by method `optimistic_read`. + #[inline] + pub fn validate_read(&self, stamp: usize) -> bool { + atomic::fence(Ordering::Acquire); + self.state.load(Ordering::Relaxed) == stamp + } + + /// Grabs the lock for writing. + #[inline] + pub fn write(&'static self) -> SeqLockWriteGuard { + let backoff = Backoff::new(); + loop { + let previous = self.state.swap(1, Ordering::Acquire); + + if previous != 1 { + atomic::fence(Ordering::Release); + + return SeqLockWriteGuard { + lock: self, + state: previous, + }; + } + + backoff.snooze(); + } + } +} + +/// A RAII guard that releases the lock and increments the stamp when dropped. +pub struct SeqLockWriteGuard { + /// The parent lock. + lock: &'static SeqLock, + + /// The stamp before locking. + state: usize, +} + +impl SeqLockWriteGuard { + /// Releases the lock without incrementing the stamp. + #[inline] + pub fn abort(self) { + self.lock.state.store(self.state, Ordering::Release); + } +} + +impl Drop for SeqLockWriteGuard { + #[inline] + fn drop(&mut self) { + // Release the lock and increment the stamp. + self.lock + .state + .store(self.state.wrapping_add(2), Ordering::Release); + } +} diff --git a/crossbeam-utils/src/atomic/seq_lock_wide.rs b/crossbeam-utils/src/atomic/seq_lock_wide.rs new file mode 100644 index 000000000..85ecdba2d --- /dev/null +++ b/crossbeam-utils/src/atomic/seq_lock_wide.rs @@ -0,0 +1,106 @@ +use core::sync::atomic::{self, AtomicUsize, Ordering}; + +use Backoff; + +/// A simple stamped lock. +/// +/// The state is represented as two `AtomicUsize`: `state_hi` for high bits and `state_lo` for low +/// bits. +pub struct SeqLock { + /// The high bits of the current state of the lock. + state_hi: AtomicUsize, + + /// The low bits of the current state of the lock. + /// + /// All bits except the least significant one hold the current stamp. When locked, the state_lo + /// equals 1 and doesn't contain a valid stamp. + state_lo: AtomicUsize, +} + +impl SeqLock { + pub const fn new() -> Self { + Self { + state_hi: AtomicUsize::new(0), + state_lo: AtomicUsize::new(0), + } + } + + /// If not locked, returns the current stamp. + /// + /// This method should be called before optimistic reads. + #[inline] + pub fn optimistic_read(&self) -> Option<(usize, usize)> { + let state_hi = self.state_hi.load(Ordering::Acquire); + let state_lo = self.state_lo.load(Ordering::Acquire); + if state_lo == 1 { + None + } else { + Some((state_hi, state_lo)) + } + } + + /// Returns `true` if the current stamp is equal to `stamp`. + /// + /// This method should be called after optimistic reads to check whether they are valid. The + /// argument `stamp` should correspond to the one returned by method `optimistic_read`. + #[inline] + pub fn validate_read(&self, stamp: (usize, usize)) -> bool { + atomic::fence(Ordering::Acquire); + let state_lo = self.state_lo.load(Ordering::Acquire); + let state_hi = self.state_hi.load(Ordering::Relaxed); + (state_hi, state_lo) == stamp + } + + /// Grabs the lock for writing. + #[inline] + pub fn write(&'static self) -> SeqLockWriteGuard { + let backoff = Backoff::new(); + loop { + let previous = self.state_lo.swap(1, Ordering::Acquire); + + if previous != 1 { + atomic::fence(Ordering::Release); + + return SeqLockWriteGuard { + lock: self, + state_lo: previous, + }; + } + + backoff.snooze(); + } + } +} + +/// A RAII guard that releases the lock and increments the stamp when dropped. +pub struct SeqLockWriteGuard { + /// The parent lock. + lock: &'static SeqLock, + + /// The stamp before locking. + state_lo: usize, +} + +impl SeqLockWriteGuard { + /// Releases the lock without incrementing the stamp. + #[inline] + pub fn abort(self) { + self.lock.state_lo.store(self.state_lo, Ordering::Release); + } +} + +impl Drop for SeqLockWriteGuard { + #[inline] + fn drop(&mut self) { + let state_lo = self.state_lo.wrapping_add(2); + + // Increase the high bits if the low bits wrap around. + if state_lo == 0 { + let state_hi = self.lock.state_hi.load(Ordering::Relaxed); + self.lock.state_hi.store(state_hi.wrapping_add(1), Ordering::Release); + } + + // Release the lock and increment the stamp. + self.lock.state_lo.store(state_lo, Ordering::Release); + } +} From e870d0cd79374f11ba3223fc300847a774408a12 Mon Sep 17 00:00:00 2001 From: Jeehoon Kang Date: Wed, 24 Apr 2019 11:21:22 +0900 Subject: [PATCH 2/2] Address comments --- crossbeam-utils/src/atomic/atomic_cell.rs | 2 +- crossbeam-utils/src/atomic/mod.rs | 13 +++++-- crossbeam-utils/src/atomic/seq_lock.rs | 10 +++--- crossbeam-utils/src/atomic/seq_lock_wide.rs | 40 +++++++++++++++++---- 4 files changed, 48 insertions(+), 17 deletions(-) diff --git a/crossbeam-utils/src/atomic/atomic_cell.rs b/crossbeam-utils/src/atomic/atomic_cell.rs index 545cf9202..5166a74f2 100644 --- a/crossbeam-utils/src/atomic/atomic_cell.rs +++ b/crossbeam-utils/src/atomic/atomic_cell.rs @@ -666,7 +666,7 @@ fn lock(addr: usize) -> &'static SeqLock { // In order to protect from such cases, we simply choose a large prime number for `LEN`. const LEN: usize = 97; - const L: SeqLock = SeqLock::new(); + const L: SeqLock = SeqLock::INIT; static LOCKS: [SeqLock; LEN] = [ L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, L, diff --git a/crossbeam-utils/src/atomic/mod.rs b/crossbeam-utils/src/atomic/mod.rs index 5f09d0832..074b0ca53 100644 --- a/crossbeam-utils/src/atomic/mod.rs +++ b/crossbeam-utils/src/atomic/mod.rs @@ -1,9 +1,16 @@ //! Atomic types. cfg_if! { - // TODO(jeehoonkang): want to express `target_pointer_width >= "64"`, but it's not expressible - // in Rust for the time being. - if #[cfg(target_pointer_width = "64")] { + // Use "wide" sequence lock if the pointer width <= 32 for preventing its counter against wrap + // around. + // + // We are ignoring too wide architectures (pointer width >= 256), since such a system will not + // appear in a conceivable future. + // + // In narrow architectures (pointer width <= 16), the counter is still <= 32-bit and may be + // vulnerable to wrap around. But it's mostly okay, since in such a primitive hardware, the + // counter will not be increased that fast. + if #[cfg(any(target_pointer_width = "64", target_pointer_width = "128"))] { mod seq_lock; } else { #[path = "seq_lock_wide.rs"] diff --git a/crossbeam-utils/src/atomic/seq_lock.rs b/crossbeam-utils/src/atomic/seq_lock.rs index 674cc0358..533a036b5 100644 --- a/crossbeam-utils/src/atomic/seq_lock.rs +++ b/crossbeam-utils/src/atomic/seq_lock.rs @@ -12,11 +12,9 @@ pub struct SeqLock { } impl SeqLock { - pub const fn new() -> Self { - Self { - state: AtomicUsize::new(0), - } - } + pub const INIT: Self = Self { + state: AtomicUsize::new(0), + }; /// If not locked, returns the current stamp. /// @@ -62,7 +60,7 @@ impl SeqLock { } } -/// A RAII guard that releases the lock and increments the stamp when dropped. +/// An RAII guard that releases the lock and increments the stamp when dropped. pub struct SeqLockWriteGuard { /// The parent lock. lock: &'static SeqLock, diff --git a/crossbeam-utils/src/atomic/seq_lock_wide.rs b/crossbeam-utils/src/atomic/seq_lock_wide.rs index 85ecdba2d..090a849bd 100644 --- a/crossbeam-utils/src/atomic/seq_lock_wide.rs +++ b/crossbeam-utils/src/atomic/seq_lock_wide.rs @@ -18,18 +18,22 @@ pub struct SeqLock { } impl SeqLock { - pub const fn new() -> Self { - Self { - state_hi: AtomicUsize::new(0), - state_lo: AtomicUsize::new(0), - } - } + pub const INIT: Self = Self { + state_hi: AtomicUsize::new(0), + state_lo: AtomicUsize::new(0), + }; /// If not locked, returns the current stamp. /// /// This method should be called before optimistic reads. #[inline] pub fn optimistic_read(&self) -> Option<(usize, usize)> { + // The acquire loads from `state_hi` and `state_lo` synchronize with the release stores in + // `SeqLockWriteGuard::drop`. + // + // As a consequence, we can make sure that (1) all writes within the era of `state_hi - 1` + // happens before now; and therefore, (2) if `state_lo` is even, all writes within the + // critical section of (`state_hi`, `state_lo`) happens before now. let state_hi = self.state_hi.load(Ordering::Acquire); let state_lo = self.state_lo.load(Ordering::Acquire); if state_lo == 1 { @@ -45,9 +49,25 @@ impl SeqLock { /// argument `stamp` should correspond to the one returned by method `optimistic_read`. #[inline] pub fn validate_read(&self, stamp: (usize, usize)) -> bool { + // Thanks to the fence, if we're noticing any modification to the data at the critical + // section of `(a, b)`, then the critical section's write of 1 to state_lo should be + // visible. atomic::fence(Ordering::Acquire); + + // So if `state_lo` coincides with `stamp.1`, then either (1) we're noticing no modification + // to the data after the critical section of `(stamp.0, stamp.1)`, or (2) `state_lo` wrapped + // around. + // + // If (2) is the case, the acquire ordering ensures we see the new value of `state_hi`. let state_lo = self.state_lo.load(Ordering::Acquire); + + // If (2) is the case and `state_hi` coincides with `stamp.0`, then `state_hi` also wrapped + // around, which we give up to correctly validate the read. let state_hi = self.state_hi.load(Ordering::Relaxed); + + // Except for the case that both `state_hi` and `state_lo` wrapped around, the following + // condition implies that we're noticing no modification to the data after the critical + // section of `(stamp.0, stamp.1)`. (state_hi, state_lo) == stamp } @@ -59,6 +79,8 @@ impl SeqLock { let previous = self.state_lo.swap(1, Ordering::Acquire); if previous != 1 { + // To synchronize with the acquire fence in `validate_read` via any modification to + // the data at the critical section of `(state_hi, previous)`. atomic::fence(Ordering::Release); return SeqLockWriteGuard { @@ -72,7 +94,7 @@ impl SeqLock { } } -/// A RAII guard that releases the lock and increments the stamp when dropped. +/// An RAII guard that releases the lock and increments the stamp when dropped. pub struct SeqLockWriteGuard { /// The parent lock. lock: &'static SeqLock, @@ -95,12 +117,16 @@ impl Drop for SeqLockWriteGuard { let state_lo = self.state_lo.wrapping_add(2); // Increase the high bits if the low bits wrap around. + // + // Release ordering for synchronizing with `optimistic_read`. if state_lo == 0 { let state_hi = self.lock.state_hi.load(Ordering::Relaxed); self.lock.state_hi.store(state_hi.wrapping_add(1), Ordering::Release); } // Release the lock and increment the stamp. + // + // Release ordering for synchronizing with `optimistic_read`. self.lock.state_lo.store(state_lo, Ordering::Release); } }