From 50c5c7d77427ea48084e50c1cf61962b564dd46d Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Sat, 2 Jan 2021 08:17:51 +0000 Subject: [PATCH 1/9] Use a list instead of a hash map --- src/cached.rs | 14 ++- src/lib.rs | 238 ++++++++++++++++++++--------------------------- src/thread_id.rs | 11 ++- 3 files changed, 117 insertions(+), 146 deletions(-) diff --git a/src/cached.rs b/src/cached.rs index ab43c86..7955d3e 100644 --- a/src/cached.rs +++ b/src/cached.rs @@ -3,6 +3,7 @@ use std::cell::UnsafeCell; use std::fmt; use std::panic::UnwindSafe; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::usize; use thread_id; use unreachable::{UncheckedOptionExt, UncheckedResultExt}; @@ -30,7 +31,7 @@ impl CachedThreadLocal { /// Creates a new empty `CachedThreadLocal`. pub fn new() -> CachedThreadLocal { CachedThreadLocal { - owner: AtomicUsize::new(0), + owner: AtomicUsize::new(usize::MAX), local: UnsafeCell::new(None), global: ThreadLocal::new(), } @@ -43,7 +44,7 @@ impl CachedThreadLocal { if owner == id { return unsafe { Some((*self.local.get()).as_ref().unchecked_unwrap()) }; } - if owner == 0 { + if owner == usize::MAX { return None; } self.global.get_fast(id) @@ -83,7 +84,12 @@ impl CachedThreadLocal { where F: FnOnce() -> Result, { - if owner == 0 && self.owner.compare_and_swap(0, id, Ordering::Relaxed) == 0 { + if owner == usize::MAX + && self + .owner + .compare_and_swap(usize::MAX, id, Ordering::Relaxed) + == usize::MAX + { unsafe { (*self.local.get()) = Some(Box::new(create()?)); return Ok((*self.local.get()).as_ref().unchecked_unwrap()); @@ -91,7 +97,7 @@ impl CachedThreadLocal { } match self.global.get_fast(id) { Some(x) => Ok(x), - None => Ok(self.global.insert(id, Box::new(create()?), true)), + None => Ok(self.global.insert(id, create()?, true)), } } diff --git a/src/lib.rs b/src/lib.rs index 4362e8f..4301cd6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,7 +84,7 @@ use std::cell::UnsafeCell; use std::fmt; use std::marker::PhantomData; use std::panic::UnwindSafe; -use std::sync::atomic::{AtomicPtr, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Mutex; use unreachable::{UncheckedOptionExt, UncheckedResultExt}; @@ -92,33 +92,34 @@ use unreachable::{UncheckedOptionExt, UncheckedResultExt}; /// /// See the [module-level documentation](index.html) for more. pub struct ThreadLocal { - // Pointer to the current top-level hash table - table: AtomicPtr>, + // Pointer to the current top-level list + list: AtomicPtr>, // Lock used to guard against concurrent modifications. This is only taken - // while writing to the table, not when reading from it. This also guards - // the counter for the total number of values in the hash table. + // while writing to the list, not when reading from it. This also guards + // the counter for the total number of values in the thread local. lock: Mutex, } -struct Table { - // Hash entries for the table - entries: Box<[TableEntry]>, - - // Number of bits used for the hash function - hash_bits: usize, - - // Previous table, half the size of the current one - prev: Option>>, +/// A list of thread-local values. +struct List { + // The thread local values in this list. If any values is `None`, it is + // either in an earlier list or it is uninitialized. + values: Box<[UnsafeCell>]>, + + // Previous list, half the size of the current one + // + // This cannot be a Box as that would result in the Box's pointer + // potentially being aliased when creating a new list, which is UB. + prev: Option<*mut List>, } -struct TableEntry { - // Current owner of this entry, or 0 if this is an empty entry - owner: AtomicUsize, - - // The object associated with this entry. This is only ever accessed by the - // owner of the entry. - data: UnsafeCell>>, +impl Drop for List { + fn drop(&mut self) { + if let Some(prev) = self.prev.take() { + drop(unsafe { Box::from_raw(prev) }); + } + } } // ThreadLocal is always Sync, even if T isn't @@ -133,47 +134,29 @@ impl Default for ThreadLocal { impl Drop for ThreadLocal { fn drop(&mut self) { unsafe { - Box::from_raw(self.table.load(Ordering::Relaxed)); - } - } -} - -// Implementation of Clone for TableEntry, needed to make vec![] work -impl Clone for TableEntry { - fn clone(&self) -> TableEntry { - TableEntry { - owner: AtomicUsize::new(0), - data: UnsafeCell::new(None), + Box::from_raw(*self.list.get_mut()); } } } -// Hash function for the thread id -#[cfg(target_pointer_width = "32")] -#[inline] -fn hash(id: usize, bits: usize) -> usize { - id.wrapping_mul(0x9E3779B9) >> (32 - bits) -} -#[cfg(target_pointer_width = "64")] -#[inline] -fn hash(id: usize, bits: usize) -> usize { - id.wrapping_mul(0x9E37_79B9_7F4A_7C15) >> (64 - bits) -} - impl ThreadLocal { /// Creates a new empty `ThreadLocal`. pub fn new() -> ThreadLocal { - let entry = TableEntry { - owner: AtomicUsize::new(0), - data: UnsafeCell::new(None), - }; - let table = Table { - entries: vec![entry; 2].into_boxed_slice(), - hash_bits: 1, + ThreadLocal::with_capacity(2) + } + + /// Creates a new `ThreadLocal` with an initial capacity. If less than the capacity threads + /// access the thread local it will never reallocate. + pub fn with_capacity(capacity: usize) -> ThreadLocal { + let list = List { + values: (0..capacity) + .map(|_| UnsafeCell::new(None)) + .collect::>() + .into_boxed_slice(), prev: None, }; ThreadLocal { - table: AtomicPtr::new(Box::into_raw(Box::new(table))), + list: AtomicPtr::new(Box::into_raw(Box::new(list))), lock: Mutex::new(0), } } @@ -206,114 +189,92 @@ impl ThreadLocal { let id = thread_id::get(); match self.get_fast(id) { Some(x) => Ok(x), - None => Ok(self.insert(id, Box::new(create()?), true)), + None => Ok(self.insert(id, create()?, true)), } } - // Simple hash table lookup function - fn lookup(id: usize, table: &Table) -> Option<&UnsafeCell>>> { - // Because we use a Mutex to prevent concurrent modifications (but not - // reads) of the hash table, we can avoid any memory barriers here. No - // elements between our hash bucket and our value can have been modified - // since we inserted our thread-local value into the table. - for entry in table.entries.iter().cycle().skip(hash(id, table.hash_bits)) { - let owner = entry.owner.load(Ordering::Relaxed); - if owner == id { - return Some(&entry.data); - } - if owner == 0 { - return None; - } - } - unreachable!(); - } - - // Fast path: try to find our thread in the top-level hash table + // Fast path: try to find our thread in the top-level list fn get_fast(&self, id: usize) -> Option<&T> { - let table = unsafe { &*self.table.load(Ordering::Acquire) }; - match Self::lookup(id, table) { - Some(x) => unsafe { Some((*x.get()).as_ref().unchecked_unwrap()) }, - None => self.get_slow(id, table), - } + let list = unsafe { &*self.list.load(Ordering::Acquire) }; + list.values + .get(id) + .and_then(|cell| unsafe { &*cell.get() }.as_ref()) + .or_else(|| self.get_slow(id, list)) } - // Slow path: try to find our thread in the other hash tables, and then - // move it to the top-level hash table. + // Slow path: try to find our thread in the other lists, and then move it to + // the top-level list. #[cold] - fn get_slow(&self, id: usize, table_top: &Table) -> Option<&T> { - let mut current = &table_top.prev; - while let Some(ref table) = *current { - if let Some(x) = Self::lookup(id, table) { - let data = unsafe { (*x.get()).take().unchecked_unwrap() }; - return Some(self.insert(id, data, false)); + fn get_slow(&self, id: usize, list_top: &List) -> Option<&T> { + let mut current = list_top.prev; + while let Some(list) = current { + let list = unsafe { &*list }; + + match list.values.get(id) { + Some(value) => { + let value_option = unsafe { &mut *value.get() }; + if value_option.is_some() { + let value = unsafe { value_option.take().unchecked_unwrap() }; + return Some(self.insert(id, value, false)); + } + } + None => break, } - current = &table.prev; + current = list.prev; } None } #[cold] - fn insert(&self, id: usize, data: Box, new: bool) -> &T { - // Lock the Mutex to ensure only a single thread is modify the hash - // table at once. + fn insert(&self, id: usize, data: T, new: bool) -> &T { + let list_raw = self.list.load(Ordering::Relaxed); + let list = unsafe { &*list_raw }; + + // Lock the Mutex to ensure only a single thread is adding new lists at + // once let mut count = self.lock.lock().unwrap(); if new { *count += 1; } - let table_raw = self.table.load(Ordering::Relaxed); - let table = unsafe { &*table_raw }; - - // If the current top-level hash table is more than 75% full, add a new - // level with 2x the capacity. Elements will be moved up to the new top - // level table as they are accessed. - let table = if *count > table.entries.len() * 3 / 4 { - let entry = TableEntry { - owner: AtomicUsize::new(0), - data: UnsafeCell::new(None), - }; - let new_table = Box::into_raw(Box::new(Table { - entries: vec![entry; table.entries.len() * 2].into_boxed_slice(), - hash_bits: table.hash_bits + 1, - prev: unsafe { Some(Box::from_raw(table_raw)) }, + + // If there isn't space for this thread's local, add a new list. + let list = if id >= list.values.len() { + let new_list = Box::into_raw(Box::new(List { + values: (0..std::cmp::max(list.values.len() * 2, id + 1)) + // Values will be lazily moved into the top-level list, so + // it starts out empty + .map(|_| UnsafeCell::new(None)) + .collect::>() + .into_boxed_slice(), + prev: Some(list_raw), })); - self.table.store(new_table, Ordering::Release); - unsafe { &*new_table } + self.list.store(new_list, Ordering::Release); + unsafe { &*new_list } } else { - table + list }; - // Insert the new element into the top-level hash table - for entry in table.entries.iter().cycle().skip(hash(id, table.hash_bits)) { - let owner = entry.owner.load(Ordering::Relaxed); - if owner == 0 { - unsafe { - entry.owner.store(id, Ordering::Relaxed); - *entry.data.get() = Some(data); - return (*entry.data.get()).as_ref().unchecked_unwrap(); - } - } - if owner == id { - // This can happen if create() inserted a value into this - // ThreadLocal between our calls to get_fast() and insert(). We - // just return the existing value and drop the newly-allocated - // Box. - unsafe { - return (*entry.data.get()).as_ref().unchecked_unwrap(); - } - } + // We are no longer adding new lists, so we don't need the guard + drop(count); + + // Insert the new element into the top-level list + unsafe { + let value_ptr = list.values.get_unchecked(id).get(); + *value_ptr = Some(data); + (&*value_ptr).as_ref().unchecked_unwrap() } - unreachable!(); } fn raw_iter(&mut self) -> RawIter { RawIter { remaining: *self.lock.get_mut().unwrap(), index: 0, - table: self.table.load(Ordering::Relaxed), + list: *self.list.get_mut(), } } - /// Returns a mutable iterator over the local values of all threads. + /// Returns a mutable iterator over the local values of all threads in + /// unspecified order. /// /// Since this call borrows the `ThreadLocal` mutably, this operation can /// be done safely---the mutable borrow statically guarantees no other @@ -376,21 +337,22 @@ impl UnwindSafe for ThreadLocal {} struct RawIter { remaining: usize, index: usize, - table: *const Table, + list: *const List, } impl Iterator for RawIter { - type Item = *mut Option>; + type Item = *mut Option; - fn next(&mut self) -> Option<*mut Option>> { + fn next(&mut self) -> Option { if self.remaining == 0 { return None; } loop { - let entries = unsafe { &(*self.table).entries[..] }; - while self.index < entries.len() { - let val = entries[self.index].data.get(); + let values = &*unsafe { &*self.list }.values; + + while self.index < values.len() { + let val = values[self.index].get(); self.index += 1; if unsafe { (*val).is_some() } { self.remaining -= 1; @@ -398,7 +360,7 @@ impl Iterator for RawIter { } } self.index = 0; - self.table = unsafe { &**(*self.table).prev.as_ref().unchecked_unwrap() }; + self.list = unsafe { &**(*self.list).prev.as_ref().unchecked_unwrap() }; } } @@ -419,7 +381,7 @@ impl<'a, T: Send + 'a> Iterator for IterMut<'a, T> { fn next(&mut self) -> Option<&'a mut T> { self.raw .next() - .map(|x| unsafe { &mut **(*x).as_mut().unchecked_unwrap() }) + .map(|x| unsafe { &mut *(*x).as_mut().unchecked_unwrap() }) } fn size_hint(&self) -> (usize, Option) { @@ -441,7 +403,7 @@ impl Iterator for IntoIter { fn next(&mut self) -> Option { self.raw .next() - .map(|x| unsafe { *(*x).take().unchecked_unwrap() }) + .map(|x| unsafe { (*x).take().unchecked_unwrap() }) } fn size_hint(&self) -> (usize, Option) { diff --git a/src/thread_id.rs b/src/thread_id.rs index 449cbb4..3944fd9 100644 --- a/src/thread_id.rs +++ b/src/thread_id.rs @@ -13,13 +13,13 @@ use std::usize; /// reuse thread IDs where possible to avoid cases where a ThreadLocal grows /// indefinitely when it is used by many short-lived threads. struct ThreadIdManager { - limit: usize, + free_from: usize, free_list: BinaryHeap, } impl ThreadIdManager { fn new() -> ThreadIdManager { ThreadIdManager { - limit: usize::MAX, + free_from: 0, free_list: BinaryHeap::new(), } } @@ -27,8 +27,11 @@ impl ThreadIdManager { if let Some(id) = self.free_list.pop() { id } else { - let id = self.limit; - self.limit = self.limit.checked_sub(1).expect("Ran out of thread IDs"); + let id = self.free_from; + self.free_from = self + .free_from + .checked_add(1) + .expect("Ran out of thread IDs"); id } } From 14c5889b5b5d95e8d270dbe6e85950ca635be559 Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Sat, 2 Jan 2021 10:57:05 +0000 Subject: [PATCH 2/9] Save space with NonNull instead of a raw pointer --- src/lib.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4301cd6..2507241 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -84,6 +84,7 @@ use std::cell::UnsafeCell; use std::fmt; use std::marker::PhantomData; use std::panic::UnwindSafe; +use std::ptr::NonNull; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Mutex; use unreachable::{UncheckedOptionExt, UncheckedResultExt}; @@ -111,13 +112,13 @@ struct List { // // This cannot be a Box as that would result in the Box's pointer // potentially being aliased when creating a new list, which is UB. - prev: Option<*mut List>, + prev: Option>>, } impl Drop for List { fn drop(&mut self) { if let Some(prev) = self.prev.take() { - drop(unsafe { Box::from_raw(prev) }); + drop(unsafe { Box::from_raw(prev.as_ptr()) }); } } } @@ -208,7 +209,7 @@ impl ThreadLocal { fn get_slow(&self, id: usize, list_top: &List) -> Option<&T> { let mut current = list_top.prev; while let Some(list) = current { - let list = unsafe { &*list }; + let list = unsafe { list.as_ref() }; match list.values.get(id) { Some(value) => { @@ -246,7 +247,7 @@ impl ThreadLocal { .map(|_| UnsafeCell::new(None)) .collect::>() .into_boxed_slice(), - prev: Some(list_raw), + prev: Some(unsafe { NonNull::new_unchecked(list_raw) }), })); self.list.store(new_list, Ordering::Release); unsafe { &*new_list } @@ -360,7 +361,7 @@ impl Iterator for RawIter { } } self.index = 0; - self.list = unsafe { &**(*self.list).prev.as_ref().unchecked_unwrap() }; + self.list = unsafe { (*self.list).prev.as_ref().unchecked_unwrap().as_ref() }; } } From 6e46d4f23c49f7672efe18ed7b351f0cbaafa76f Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Sun, 3 Jan 2021 17:30:24 +0000 Subject: [PATCH 3/9] Reverse the thread id binary heap --- src/thread_id.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/thread_id.rs b/src/thread_id.rs index 3944fd9..238b895 100644 --- a/src/thread_id.rs +++ b/src/thread_id.rs @@ -5,6 +5,7 @@ // http://opensource.org/licenses/MIT>, at your option. This file may not be // copied, modified, or distributed except according to those terms. +use std::cmp::Reverse; use std::collections::BinaryHeap; use std::sync::Mutex; use std::usize; @@ -14,7 +15,7 @@ use std::usize; /// indefinitely when it is used by many short-lived threads. struct ThreadIdManager { free_from: usize, - free_list: BinaryHeap, + free_list: BinaryHeap>, } impl ThreadIdManager { fn new() -> ThreadIdManager { @@ -25,7 +26,7 @@ impl ThreadIdManager { } fn alloc(&mut self) -> usize { if let Some(id) = self.free_list.pop() { - id + id.0 } else { let id = self.free_from; self.free_from = self @@ -36,7 +37,7 @@ impl ThreadIdManager { } } fn free(&mut self, id: usize) { - self.free_list.push(id); + self.free_list.push(Reverse(id)); } } lazy_static! { From d7b2b1d6a6572ea3d9a02d865bfe77ac46006e48 Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Sun, 3 Jan 2021 18:14:35 +0000 Subject: [PATCH 4/9] Use criterion for benchmarks and add more of them --- Cargo.toml | 7 +++++ benches/thread_local.rs | 69 ++++++++++++++++++++++++++++++++--------- 2 files changed, 62 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 83921af..9d36d7a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,10 @@ travis-ci = { repository = "Amanieu/thread_local-rs" } [dependencies] lazy_static = "1.0" + +[dev-dependencies] +criterion = "0.3.3" + +[[bench]] +name = "thread_local" +harness = false diff --git a/benches/thread_local.rs b/benches/thread_local.rs index 4ead369..fd03649 100644 --- a/benches/thread_local.rs +++ b/benches/thread_local.rs @@ -1,22 +1,63 @@ -#![feature(test)] - -extern crate test; +extern crate criterion; extern crate thread_local; +use criterion::{black_box, BatchSize}; + use thread_local::{CachedThreadLocal, ThreadLocal}; -#[bench] -fn thread_local(b: &mut test::Bencher) { - let local = ThreadLocal::new(); - b.iter(|| { - let _: &i32 = local.get_or(|| Box::new(0)); +fn main() { + let mut c = criterion::Criterion::default().configure_from_args(); + + c.bench_function("get", |b| { + let local = ThreadLocal::new(); + local.get_or(|| Box::new(0)); + b.iter(|| { + black_box(local.get()); + }); + }); + + c.bench_function("get_cached", |b| { + let local = CachedThreadLocal::new(); + local.get_or(|| Box::new(0)); + b.iter(|| { + black_box(local.get()); + }); + }); + + c.bench_function("get_cached_second_thread", |b| { + let local = CachedThreadLocal::new(); + local.get(); + let local = std::thread::spawn(move || { + local.get_or(|| Box::new(0)); + local + }) + .join() + .unwrap(); + + local.get_or(|| Box::new(0)); + + b.iter(|| { + black_box(local.get()); + }); + }); + + c.bench_function("insert", |b| { + b.iter_batched_ref( + ThreadLocal::new, + |local| { + black_box(local.get_or(|| 0)); + }, + BatchSize::SmallInput, + ) }); -} -#[bench] -fn cached_thread_local(b: &mut test::Bencher) { - let local = CachedThreadLocal::new(); - b.iter(|| { - let _: &i32 = local.get_or(|| Box::new(0)); + c.bench_function("insert_cached", |b| { + b.iter_batched_ref( + CachedThreadLocal::new, + |local| { + black_box(local.get_or(|| 0)); + }, + BatchSize::SmallInput, + ) }); } From 6c744de251d93c0feb3d728d6377d0e0a825f2cf Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Sun, 3 Jan 2021 19:04:20 +0000 Subject: [PATCH 5/9] Skip benchmarking on 1.28.0 --- .travis.yml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index e835e9f..38584fc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,7 +5,7 @@ rust: - nightly - beta - stable -- 1.27.2 +- 1.28.0 before_script: - | @@ -15,12 +15,11 @@ before_script: script: - travis-cargo build - travis-cargo test -- travis-cargo bench +- travis-cargo --skip 1.28.0 bench - travis-cargo doc -- --no-deps after_success: - travis-cargo --only nightly doc-upload - env: global: - TRAVIS_CARGO_NIGHTLY_FEATURE="" From 0b0830efedabce37a934a1feb3f0b3b590fa223b Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Sun, 3 Jan 2021 20:04:51 +0000 Subject: [PATCH 6/9] Fix testing on 1.28.0 --- .travis.yml | 3 ++- Cargo.toml | 5 ++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 38584fc..a516890 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,8 @@ before_script: script: - travis-cargo build - travis-cargo test -- travis-cargo --skip 1.28.0 bench +# Criterion doesn't build on 1.28.0 +- travis-cargo --skip 1.28.0 bench --features criterion - travis-cargo doc -- --no-deps after_success: diff --git a/Cargo.toml b/Cargo.toml index 9d36d7a..e640fad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,9 +15,12 @@ travis-ci = { repository = "Amanieu/thread_local-rs" } [dependencies] lazy_static = "1.0" +# This is actually a dev-dependency, see https://github.com/rust-lang/cargo/issues/1596 +criterion = { version = "0.3.3", optional = true } + [dev-dependencies] -criterion = "0.3.3" [[bench]] name = "thread_local" +required-features = ["criterion"] harness = false From fe072dcfcbb14b4ab68efa205eb64369b1282f5b Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Mon, 4 Jan 2021 15:08:34 +0000 Subject: [PATCH 7/9] Use a concurrent vector --- src/cached.rs | 22 ++--- src/lib.rs | 229 ++++++++++++++++++++++++----------------------- src/thread_id.rs | 84 ++++++++++++++--- 3 files changed, 202 insertions(+), 133 deletions(-) diff --git a/src/cached.rs b/src/cached.rs index 7955d3e..ebae662 100644 --- a/src/cached.rs +++ b/src/cached.rs @@ -4,7 +4,7 @@ use std::fmt; use std::panic::UnwindSafe; use std::sync::atomic::{AtomicUsize, Ordering}; use std::usize; -use thread_id; +use thread_id::{self, Thread}; use unreachable::{UncheckedOptionExt, UncheckedResultExt}; /// Wrapper around `ThreadLocal` which adds a fast path for a single thread. @@ -39,15 +39,15 @@ impl CachedThreadLocal { /// Returns the element for the current thread, if it exists. pub fn get(&self) -> Option<&T> { - let id = thread_id::get(); + let thread = thread_id::get(); let owner = self.owner.load(Ordering::Relaxed); - if owner == id { + if owner == thread.id { return unsafe { Some((*self.local.get()).as_ref().unchecked_unwrap()) }; } if owner == usize::MAX { return None; } - self.global.get_fast(id) + self.global.get_inner(thread) } /// Returns the element for the current thread, or creates it if it doesn't @@ -70,24 +70,24 @@ impl CachedThreadLocal { where F: FnOnce() -> Result, { - let id = thread_id::get(); + let thread = thread_id::get(); let owner = self.owner.load(Ordering::Relaxed); - if owner == id { + if owner == thread.id { return Ok(unsafe { (*self.local.get()).as_ref().unchecked_unwrap() }); } - self.get_or_try_slow(id, owner, create) + self.get_or_try_slow(thread, owner, create) } #[cold] #[inline(never)] - fn get_or_try_slow(&self, id: usize, owner: usize, create: F) -> Result<&T, E> + fn get_or_try_slow(&self, thread: Thread, owner: usize, create: F) -> Result<&T, E> where F: FnOnce() -> Result, { if owner == usize::MAX && self .owner - .compare_and_swap(usize::MAX, id, Ordering::Relaxed) + .compare_and_swap(usize::MAX, thread.id, Ordering::Relaxed) == usize::MAX { unsafe { @@ -95,9 +95,9 @@ impl CachedThreadLocal { return Ok((*self.local.get()).as_ref().unchecked_unwrap()); } } - match self.global.get_fast(id) { + match self.global.get_inner(thread) { Some(x) => Ok(x), - None => Ok(self.global.insert(id, create()?, true)), + None => Ok(self.global.insert(thread, create()?)), } } diff --git a/src/lib.rs b/src/lib.rs index 2507241..3b9d99d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -83,46 +83,39 @@ pub use cached::{CachedIntoIter, CachedIterMut, CachedThreadLocal}; use std::cell::UnsafeCell; use std::fmt; use std::marker::PhantomData; +use std::mem; use std::panic::UnwindSafe; -use std::ptr::NonNull; +use std::ptr; use std::sync::atomic::{AtomicPtr, Ordering}; use std::sync::Mutex; +use thread_id::Thread; use unreachable::{UncheckedOptionExt, UncheckedResultExt}; +#[cfg(target_pointer_width = "16")] +const POINTER_WIDTH: u8 = 16; +#[cfg(target_pointer_width = "32")] +const POINTER_WIDTH: u8 = 32; +#[cfg(target_pointer_width = "64")] +const POINTER_WIDTH: u8 = 64; + +/// The total number of buckets stored in each thread local. +const BUCKETS: usize = (POINTER_WIDTH + 1) as usize; + /// Thread-local variable wrapper /// /// See the [module-level documentation](index.html) for more. pub struct ThreadLocal { - // Pointer to the current top-level list - list: AtomicPtr>, - - // Lock used to guard against concurrent modifications. This is only taken - // while writing to the list, not when reading from it. This also guards - // the counter for the total number of values in the thread local. + /// The buckets in the thread local. The nth bucket contains `2^(n-1)` + /// elements. Each bucket is lazily allocated. + buckets: [AtomicPtr>>; BUCKETS], + + /// Lock used to guard against concurrent modifications. This is taken when + /// there is a possibility of allocating a new bucket, which only occurs + /// when inserting values. This also guards the counter for the total number + /// of values in the thread local. lock: Mutex, } -/// A list of thread-local values. -struct List { - // The thread local values in this list. If any values is `None`, it is - // either in an earlier list or it is uninitialized. - values: Box<[UnsafeCell>]>, - - // Previous list, half the size of the current one - // - // This cannot be a Box as that would result in the Box's pointer - // potentially being aliased when creating a new list, which is UB. - prev: Option>>, -} - -impl Drop for List { - fn drop(&mut self) { - if let Some(prev) = self.prev.take() { - drop(unsafe { Box::from_raw(prev.as_ptr()) }); - } - } -} - // ThreadLocal is always Sync, even if T isn't unsafe impl Sync for ThreadLocal {} @@ -134,8 +127,22 @@ impl Default for ThreadLocal { impl Drop for ThreadLocal { fn drop(&mut self) { - unsafe { - Box::from_raw(*self.list.get_mut()); + let mut bucket_size = 1; + + // Free each non-null bucket + for (i, bucket) in self.buckets.iter_mut().enumerate() { + let bucket_ptr = *bucket.get_mut(); + + let this_bucket_size = bucket_size; + if i != 0 { + bucket_size <<= 1; + } + + if bucket_ptr.is_null() { + continue; + } + + unsafe { Box::from_raw(std::slice::from_raw_parts_mut(bucket_ptr, this_bucket_size)) }; } } } @@ -143,29 +150,40 @@ impl Drop for ThreadLocal { impl ThreadLocal { /// Creates a new empty `ThreadLocal`. pub fn new() -> ThreadLocal { - ThreadLocal::with_capacity(2) + Self::with_capacity(2) } /// Creates a new `ThreadLocal` with an initial capacity. If less than the capacity threads - /// access the thread local it will never reallocate. + /// access the thread local it will never reallocate. The capacity may be rounded up to the + /// nearest power of two. pub fn with_capacity(capacity: usize) -> ThreadLocal { - let list = List { - values: (0..capacity) - .map(|_| UnsafeCell::new(None)) - .collect::>() - .into_boxed_slice(), - prev: None, - }; + let allocated_buckets = capacity + .checked_sub(1) + .map(|c| usize::from(POINTER_WIDTH) - (c.leading_zeros() as usize) + 1) + .unwrap_or(0); + + let mut buckets = [ptr::null_mut(); BUCKETS]; + let mut bucket_size = 1; + for (i, bucket) in buckets[..allocated_buckets].iter_mut().enumerate() { + *bucket = allocate_bucket::(bucket_size); + + if i != 0 { + bucket_size <<= 1; + } + } + ThreadLocal { - list: AtomicPtr::new(Box::into_raw(Box::new(list))), + // Safety: AtomicPtr has the same representation as a pointer and arrays have the same + // representation as a sequence of their inner type. + buckets: unsafe { mem::transmute(buckets) }, lock: Mutex::new(0), } } /// Returns the element for the current thread, if it exists. pub fn get(&self) -> Option<&T> { - let id = thread_id::get(); - self.get_fast(id) + let thread = thread_id::get(); + self.get_inner(thread) } /// Returns the element for the current thread, or creates it if it doesn't @@ -187,80 +205,45 @@ impl ThreadLocal { where F: FnOnce() -> Result, { - let id = thread_id::get(); - match self.get_fast(id) { + let thread = thread_id::get(); + match self.get_inner(thread) { Some(x) => Ok(x), - None => Ok(self.insert(id, create()?, true)), + None => Ok(self.insert(thread, create()?)), } } - // Fast path: try to find our thread in the top-level list - fn get_fast(&self, id: usize) -> Option<&T> { - let list = unsafe { &*self.list.load(Ordering::Acquire) }; - list.values - .get(id) - .and_then(|cell| unsafe { &*cell.get() }.as_ref()) - .or_else(|| self.get_slow(id, list)) - } - - // Slow path: try to find our thread in the other lists, and then move it to - // the top-level list. - #[cold] - fn get_slow(&self, id: usize, list_top: &List) -> Option<&T> { - let mut current = list_top.prev; - while let Some(list) = current { - let list = unsafe { list.as_ref() }; - - match list.values.get(id) { - Some(value) => { - let value_option = unsafe { &mut *value.get() }; - if value_option.is_some() { - let value = unsafe { value_option.take().unchecked_unwrap() }; - return Some(self.insert(id, value, false)); - } - } - None => break, - } - current = list.prev; + fn get_inner(&self, thread: Thread) -> Option<&T> { + let bucket_ptr = + unsafe { self.buckets.get_unchecked(thread.bucket) }.load(Ordering::Acquire); + if bucket_ptr.is_null() { + return None; } - None + unsafe { (&*(&*bucket_ptr.add(thread.index)).get()).as_ref() } } #[cold] - fn insert(&self, id: usize, data: T, new: bool) -> &T { - let list_raw = self.list.load(Ordering::Relaxed); - let list = unsafe { &*list_raw }; - - // Lock the Mutex to ensure only a single thread is adding new lists at - // once + fn insert(&self, thread: Thread, data: T) -> &T { + // Lock the Mutex to ensure only a single thread is allocating buckets at once let mut count = self.lock.lock().unwrap(); - if new { - *count += 1; - } + *count += 1; - // If there isn't space for this thread's local, add a new list. - let list = if id >= list.values.len() { - let new_list = Box::into_raw(Box::new(List { - values: (0..std::cmp::max(list.values.len() * 2, id + 1)) - // Values will be lazily moved into the top-level list, so - // it starts out empty - .map(|_| UnsafeCell::new(None)) - .collect::>() - .into_boxed_slice(), - prev: Some(unsafe { NonNull::new_unchecked(list_raw) }), - })); - self.list.store(new_list, Ordering::Release); - unsafe { &*new_list } + let bucket_atomic_ptr = unsafe { self.buckets.get_unchecked(thread.bucket) }; + + let bucket_ptr: *const _ = bucket_atomic_ptr.load(Ordering::Acquire); + let bucket_ptr = if bucket_ptr.is_null() { + // Allocate a new bucket + let bucket_ptr = allocate_bucket(thread.bucket_size); + bucket_atomic_ptr.store(bucket_ptr, Ordering::Release); + bucket_ptr } else { - list + bucket_ptr }; - // We are no longer adding new lists, so we don't need the guard drop(count); - // Insert the new element into the top-level list + // Insert the new element into the bucket unsafe { - let value_ptr = list.values.get_unchecked(id).get(); + let value_ptr = (&*bucket_ptr.add(thread.index)).get(); *value_ptr = Some(data); (&*value_ptr).as_ref().unchecked_unwrap() } @@ -269,8 +252,12 @@ impl ThreadLocal { fn raw_iter(&mut self) -> RawIter { RawIter { remaining: *self.lock.get_mut().unwrap(), + buckets: unsafe { + *(&self.buckets as *const _ as *const [*const UnsafeCell>; BUCKETS]) + }, + bucket: 0, + bucket_size: 1, index: 0, - list: *self.list.get_mut(), } } @@ -337,8 +324,10 @@ impl UnwindSafe for ThreadLocal {} struct RawIter { remaining: usize, + buckets: [*const UnsafeCell>; BUCKETS], + bucket: usize, + bucket_size: usize, index: usize, - list: *const List, } impl Iterator for RawIter { @@ -350,18 +339,27 @@ impl Iterator for RawIter { } loop { - let values = &*unsafe { &*self.list }.values; - - while self.index < values.len() { - let val = values[self.index].get(); - self.index += 1; - if unsafe { (*val).is_some() } { - self.remaining -= 1; - return Some(val); + let bucket = unsafe { *self.buckets.get_unchecked(self.bucket) }; + + if !bucket.is_null() { + while self.index < self.bucket_size { + let item = unsafe { (&*bucket.add(self.index)).get() }; + + self.index += 1; + + if unsafe { &*item }.is_some() { + self.remaining -= 1; + return Some(item); + } } } + + if self.bucket != 0 { + self.bucket_size <<= 1; + } + self.bucket += 1; + self.index = 0; - self.list = unsafe { (*self.list).prev.as_ref().unchecked_unwrap().as_ref() }; } } @@ -414,6 +412,15 @@ impl Iterator for IntoIter { impl ExactSizeIterator for IntoIter {} +fn allocate_bucket(size: usize) -> *mut UnsafeCell> { + Box::into_raw( + (0..size) + .map(|_| UnsafeCell::new(None::)) + .collect::>() + .into_boxed_slice(), + ) as *mut _ +} + #[cfg(test)] mod tests { use super::{CachedThreadLocal, ThreadLocal}; diff --git a/src/thread_id.rs b/src/thread_id.rs index 238b895..397f772 100644 --- a/src/thread_id.rs +++ b/src/thread_id.rs @@ -9,6 +9,7 @@ use std::cmp::Reverse; use std::collections::BinaryHeap; use std::sync::Mutex; use std::usize; +use POINTER_WIDTH; /// Thread ID manager which allocates thread IDs. It attempts to aggressively /// reuse thread IDs where possible to avoid cases where a ThreadLocal grows @@ -44,22 +45,83 @@ lazy_static! { static ref THREAD_ID_MANAGER: Mutex = Mutex::new(ThreadIdManager::new()); } -/// Non-zero integer which is unique to the current thread while it is running. +/// Data which is unique to the current thread while it is running. /// A thread ID may be reused after a thread exits. -struct ThreadId(usize); -impl ThreadId { - fn new() -> ThreadId { - ThreadId(THREAD_ID_MANAGER.lock().unwrap().alloc()) +#[derive(Clone, Copy)] +pub(crate) struct Thread { + /// The thread ID obtained from the thread ID manager. + pub(crate) id: usize, + /// The bucket this thread's local storage will be in. + pub(crate) bucket: usize, + /// The size of the bucket this thread's local storage will be in. + pub(crate) bucket_size: usize, + /// The index into the bucket this thread's local storage is in. + pub(crate) index: usize, +} +impl Thread { + fn new(id: usize) -> Thread { + let bucket = usize::from(POINTER_WIDTH) - id.leading_zeros() as usize; + let bucket_size = 1 << bucket.saturating_sub(1); + let index = if id != 0 { id ^ bucket_size } else { 0 }; + + Thread { + id, + bucket, + bucket_size, + index, + } } } -impl Drop for ThreadId { + +/// Wrapper around `Thread` that allocates and deallocates the ID. +struct ThreadHolder(Thread); +impl ThreadHolder { + fn new() -> ThreadHolder { + ThreadHolder(Thread::new(THREAD_ID_MANAGER.lock().unwrap().alloc())) + } +} +impl Drop for ThreadHolder { fn drop(&mut self) { - THREAD_ID_MANAGER.lock().unwrap().free(self.0); + THREAD_ID_MANAGER.lock().unwrap().free(self.0.id); } } -thread_local!(static THREAD_ID: ThreadId = ThreadId::new()); -/// Returns a non-zero ID for the current thread -pub(crate) fn get() -> usize { - THREAD_ID.with(|x| x.0) +thread_local!(static THREAD_HOLDER: ThreadHolder = ThreadHolder::new()); + +/// Get the current thread. +pub(crate) fn get() -> Thread { + THREAD_HOLDER.with(|holder| holder.0) +} + +#[test] +fn test_thread() { + let thread = Thread::new(0); + assert_eq!(thread.id, 0); + assert_eq!(thread.bucket, 0); + assert_eq!(thread.bucket_size, 1); + assert_eq!(thread.index, 0); + + let thread = Thread::new(1); + assert_eq!(thread.id, 1); + assert_eq!(thread.bucket, 1); + assert_eq!(thread.bucket_size, 1); + assert_eq!(thread.index, 0); + + let thread = Thread::new(2); + assert_eq!(thread.id, 2); + assert_eq!(thread.bucket, 2); + assert_eq!(thread.bucket_size, 2); + assert_eq!(thread.index, 0); + + let thread = Thread::new(3); + assert_eq!(thread.id, 3); + assert_eq!(thread.bucket, 2); + assert_eq!(thread.bucket_size, 2); + assert_eq!(thread.index, 1); + + let thread = Thread::new(19); + assert_eq!(thread.id, 19); + assert_eq!(thread.bucket, 5); + assert_eq!(thread.bucket_size, 16); + assert_eq!(thread.index, 3); } From 34edea055dc5ee5321ca61dec4f33db73b88cdc3 Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Mon, 4 Jan 2021 15:12:44 +0000 Subject: [PATCH 8/9] Fix running benchmarks on CI --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index a516890..1698520 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,7 +16,7 @@ script: - travis-cargo build - travis-cargo test # Criterion doesn't build on 1.28.0 -- travis-cargo --skip 1.28.0 bench --features criterion +- travis-cargo --skip 1.28.0 bench -- --features criterion - travis-cargo doc -- --no-deps after_success: From ef33f172f251e69bb4080bbebc5662da7e7e3ab1 Mon Sep 17 00:00:00 2001 From: Koxiaet Date: Thu, 7 Jan 2021 14:13:30 +0000 Subject: [PATCH 9/9] Deprecate CachedThreadLocal --- README.md | 8 +-- benches/thread_local.rs | 37 +------------- src/cached.rs | 107 ++++++++++++---------------------------- src/lib.rs | 78 ++--------------------------- 4 files changed, 40 insertions(+), 190 deletions(-) diff --git a/README.md b/README.md index 0af6821..891e168 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,10 @@ thread_local [![Build Status](https://travis-ci.org/Amanieu/thread_local-rs.svg?branch=master)](https://travis-ci.org/Amanieu/thread_local-rs) [![Crates.io](https://img.shields.io/crates/v/thread_local.svg)](https://crates.io/crates/thread_local) -This library provides the `ThreadLocal` and `CachedThreadLocal` types which -allow a separate copy of an object to be used for each thread. This allows for -per-object thread-local storage, unlike the standard library's `thread_local!` -macro which only allows static thread-local storage. +This library provides the `ThreadLocal` type which allow a separate copy of an +object to be used for each thread. This allows for per-object thread-local +storage, unlike the standard library's `thread_local!` macro which only allows +static thread-local storage. [Documentation](https://amanieu.github.io/thread_local-rs/thread_local/index.html) diff --git a/benches/thread_local.rs b/benches/thread_local.rs index fd03649..ccad665 100644 --- a/benches/thread_local.rs +++ b/benches/thread_local.rs @@ -3,7 +3,7 @@ extern crate thread_local; use criterion::{black_box, BatchSize}; -use thread_local::{CachedThreadLocal, ThreadLocal}; +use thread_local::ThreadLocal; fn main() { let mut c = criterion::Criterion::default().configure_from_args(); @@ -16,31 +16,6 @@ fn main() { }); }); - c.bench_function("get_cached", |b| { - let local = CachedThreadLocal::new(); - local.get_or(|| Box::new(0)); - b.iter(|| { - black_box(local.get()); - }); - }); - - c.bench_function("get_cached_second_thread", |b| { - let local = CachedThreadLocal::new(); - local.get(); - let local = std::thread::spawn(move || { - local.get_or(|| Box::new(0)); - local - }) - .join() - .unwrap(); - - local.get_or(|| Box::new(0)); - - b.iter(|| { - black_box(local.get()); - }); - }); - c.bench_function("insert", |b| { b.iter_batched_ref( ThreadLocal::new, @@ -50,14 +25,4 @@ fn main() { BatchSize::SmallInput, ) }); - - c.bench_function("insert_cached", |b| { - b.iter_batched_ref( - CachedThreadLocal::new, - |local| { - black_box(local.get_or(|| 0)); - }, - BatchSize::SmallInput, - ) - }); } diff --git a/src/cached.rs b/src/cached.rs index ebae662..16f6516 100644 --- a/src/cached.rs +++ b/src/cached.rs @@ -1,26 +1,19 @@ +#![allow(deprecated)] + use super::{IntoIter, IterMut, ThreadLocal}; -use std::cell::UnsafeCell; use std::fmt; use std::panic::UnwindSafe; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::usize; -use thread_id::{self, Thread}; -use unreachable::{UncheckedOptionExt, UncheckedResultExt}; -/// Wrapper around `ThreadLocal` which adds a fast path for a single thread. +/// Wrapper around [`ThreadLocal`]. /// -/// This has the same API as `ThreadLocal`, but will register the first thread -/// that sets a value as its owner. All accesses by the owner will go through -/// a special fast path which is much faster than the normal `ThreadLocal` path. +/// This used to add a fast path for a single thread, however that has been +/// obsoleted by performance improvements to [`ThreadLocal`] itself. +#[deprecated(since = "1.1.0", note = "Use `ThreadLocal` instead")] pub struct CachedThreadLocal { - owner: AtomicUsize, - local: UnsafeCell>>, - global: ThreadLocal, + inner: ThreadLocal, } -// CachedThreadLocal is always Sync, even if T isn't -unsafe impl Sync for CachedThreadLocal {} - impl Default for CachedThreadLocal { fn default() -> CachedThreadLocal { CachedThreadLocal::new() @@ -29,76 +22,38 @@ impl Default for CachedThreadLocal { impl CachedThreadLocal { /// Creates a new empty `CachedThreadLocal`. + #[inline] pub fn new() -> CachedThreadLocal { CachedThreadLocal { - owner: AtomicUsize::new(usize::MAX), - local: UnsafeCell::new(None), - global: ThreadLocal::new(), + inner: ThreadLocal::new(), } } /// Returns the element for the current thread, if it exists. + #[inline] pub fn get(&self) -> Option<&T> { - let thread = thread_id::get(); - let owner = self.owner.load(Ordering::Relaxed); - if owner == thread.id { - return unsafe { Some((*self.local.get()).as_ref().unchecked_unwrap()) }; - } - if owner == usize::MAX { - return None; - } - self.global.get_inner(thread) + self.inner.get() } /// Returns the element for the current thread, or creates it if it doesn't /// exist. - #[inline(always)] + #[inline] pub fn get_or(&self, create: F) -> &T where F: FnOnce() -> T, { - unsafe { - self.get_or_try(|| Ok::(create())) - .unchecked_unwrap_ok() - } + self.inner.get_or(create) } /// Returns the element for the current thread, or creates it if it doesn't /// exist. If `create` fails, that error is returned and no element is /// added. + #[inline] pub fn get_or_try(&self, create: F) -> Result<&T, E> where F: FnOnce() -> Result, { - let thread = thread_id::get(); - let owner = self.owner.load(Ordering::Relaxed); - if owner == thread.id { - return Ok(unsafe { (*self.local.get()).as_ref().unchecked_unwrap() }); - } - self.get_or_try_slow(thread, owner, create) - } - - #[cold] - #[inline(never)] - fn get_or_try_slow(&self, thread: Thread, owner: usize, create: F) -> Result<&T, E> - where - F: FnOnce() -> Result, - { - if owner == usize::MAX - && self - .owner - .compare_and_swap(usize::MAX, thread.id, Ordering::Relaxed) - == usize::MAX - { - unsafe { - (*self.local.get()) = Some(Box::new(create()?)); - return Ok((*self.local.get()).as_ref().unchecked_unwrap()); - } - } - match self.global.get_inner(thread) { - Some(x) => Ok(x), - None => Ok(self.global.insert(thread, create()?)), - } + self.inner.get_or_try(create) } /// Returns a mutable iterator over the local values of all threads. @@ -106,10 +61,10 @@ impl CachedThreadLocal { /// Since this call borrows the `ThreadLocal` mutably, this operation can /// be done safely---the mutable borrow statically guarantees no other /// threads are currently accessing their associated values. + #[inline] pub fn iter_mut(&mut self) -> CachedIterMut { CachedIterMut { - local: unsafe { (*self.local.get()).as_mut().map(|x| &mut **x) }, - global: self.global.iter_mut(), + inner: self.inner.iter_mut(), } } @@ -119,8 +74,9 @@ impl CachedThreadLocal { /// Since this call borrows the `ThreadLocal` mutably, this operation can /// be done safely---the mutable borrow statically guarantees no other /// threads are currently accessing their associated values. + #[inline] pub fn clear(&mut self) { - *self = CachedThreadLocal::new(); + self.inner.clear(); } } @@ -130,8 +86,7 @@ impl IntoIterator for CachedThreadLocal { fn into_iter(self) -> CachedIntoIter { CachedIntoIter { - local: unsafe { (*self.local.get()).take().map(|x| *x) }, - global: self.global.into_iter(), + inner: self.inner.into_iter(), } } } @@ -162,42 +117,44 @@ impl fmt::Debug for CachedThreadLocal { impl UnwindSafe for CachedThreadLocal {} /// Mutable iterator over the contents of a `CachedThreadLocal`. +#[deprecated(since = "1.1.0", note = "Use `IterMut` instead")] pub struct CachedIterMut<'a, T: Send + 'a> { - local: Option<&'a mut T>, - global: IterMut<'a, T>, + inner: IterMut<'a, T>, } impl<'a, T: Send + 'a> Iterator for CachedIterMut<'a, T> { type Item = &'a mut T; + #[inline] fn next(&mut self) -> Option<&'a mut T> { - self.local.take().or_else(|| self.global.next()) + self.inner.next() } + #[inline] fn size_hint(&self) -> (usize, Option) { - let len = self.global.size_hint().0 + self.local.is_some() as usize; - (len, Some(len)) + self.inner.size_hint() } } impl<'a, T: Send + 'a> ExactSizeIterator for CachedIterMut<'a, T> {} /// An iterator that moves out of a `CachedThreadLocal`. +#[deprecated(since = "1.1.0", note = "Use `IntoIter` instead")] pub struct CachedIntoIter { - local: Option, - global: IntoIter, + inner: IntoIter, } impl Iterator for CachedIntoIter { type Item = T; + #[inline] fn next(&mut self) -> Option { - self.local.take().or_else(|| self.global.next()) + self.inner.next() } + #[inline] fn size_hint(&self) -> (usize, Option) { - let len = self.global.size_hint().0 + self.local.is_some() as usize; - (len, Some(len)) + self.inner.size_hint() } } diff --git a/src/lib.rs b/src/lib.rs index 3b9d99d..78bdcc3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,11 +20,6 @@ //! only be done if you have mutable access to the `ThreadLocal` object, which //! guarantees that you are the only thread currently accessing it. //! -//! A `CachedThreadLocal` type is also provided which wraps a `ThreadLocal` but -//! also uses a special fast path for the first thread that writes into it. The -//! fast path has very low overhead (<1ns per access) while keeping the same -//! performance as `ThreadLocal` for other threads. -//! //! Note that since thread IDs are recycled when a thread exits, it is possible //! for one thread to retrieve the object of another thread. Since this can only //! occur after a thread has exited this does not lead to any race conditions. @@ -78,6 +73,7 @@ mod cached; mod thread_id; mod unreachable; +#[allow(deprecated)] pub use cached::{CachedIntoIter, CachedIterMut, CachedThreadLocal}; use std::cell::UnsafeCell; @@ -91,6 +87,7 @@ use std::sync::Mutex; use thread_id::Thread; use unreachable::{UncheckedOptionExt, UncheckedResultExt}; +// Use usize::BITS once it has stabilized and the MSRV has been bumped. #[cfg(target_pointer_width = "16")] const POINTER_WIDTH: u8 = 16; #[cfg(target_pointer_width = "32")] @@ -423,7 +420,7 @@ fn allocate_bucket(size: usize) -> *mut UnsafeCell> { #[cfg(test)] mod tests { - use super::{CachedThreadLocal, ThreadLocal}; + use super::ThreadLocal; use std::cell::RefCell; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; @@ -452,23 +449,6 @@ mod tests { assert_eq!(None, tls.get()); } - #[test] - fn same_thread_cached() { - let create = make_create(); - let mut tls = CachedThreadLocal::new(); - assert_eq!(None, tls.get()); - assert_eq!("ThreadLocal { local_data: None }", format!("{:?}", &tls)); - assert_eq!(0, *tls.get_or(|| create())); - assert_eq!(Some(&0), tls.get()); - assert_eq!(0, *tls.get_or(|| create())); - assert_eq!(Some(&0), tls.get()); - assert_eq!(0, *tls.get_or(|| create())); - assert_eq!(Some(&0), tls.get()); - assert_eq!("ThreadLocal { local_data: Some(0) }", format!("{:?}", &tls)); - tls.clear(); - assert_eq!(None, tls.get()); - } - #[test] fn different_thread() { let create = make_create(); @@ -491,28 +471,6 @@ mod tests { assert_eq!(0, *tls.get_or(|| create())); } - #[test] - fn different_thread_cached() { - let create = make_create(); - let tls = Arc::new(CachedThreadLocal::new()); - assert_eq!(None, tls.get()); - assert_eq!(0, *tls.get_or(|| create())); - assert_eq!(Some(&0), tls.get()); - - let tls2 = tls.clone(); - let create2 = create.clone(); - thread::spawn(move || { - assert_eq!(None, tls2.get()); - assert_eq!(1, *tls2.get_or(|| create2())); - assert_eq!(Some(&1), tls2.get()); - }) - .join() - .unwrap(); - - assert_eq!(Some(&0), tls.get()); - assert_eq!(0, *tls.get_or(|| create())); - } - #[test] fn iter() { let tls = Arc::new(ThreadLocal::new()); @@ -541,40 +499,10 @@ mod tests { assert_eq!(vec![1, 2, 3], v); } - #[test] - fn iter_cached() { - let tls = Arc::new(CachedThreadLocal::new()); - tls.get_or(|| Box::new(1)); - - let tls2 = tls.clone(); - thread::spawn(move || { - tls2.get_or(|| Box::new(2)); - let tls3 = tls2.clone(); - thread::spawn(move || { - tls3.get_or(|| Box::new(3)); - }) - .join() - .unwrap(); - drop(tls2); - }) - .join() - .unwrap(); - - let mut tls = Arc::try_unwrap(tls).unwrap(); - let mut v = tls.iter_mut().map(|x| **x).collect::>(); - v.sort_unstable(); - assert_eq!(vec![1, 2, 3], v); - let mut v = tls.into_iter().map(|x| *x).collect::>(); - v.sort_unstable(); - assert_eq!(vec![1, 2, 3], v); - } - #[test] fn is_sync() { fn foo() {} foo::>(); foo::>>(); - foo::>(); - foo::>>(); } }