Skip to content

Commit

Permalink
Merge #934
Browse files Browse the repository at this point in the history
934: Fix a use-after-free race in SpinLatch::set, and release 1.9.3 r=cuviper a=cuviper

`SpinLatch<'r>` borrows `&'r Arc<Registry>` from the `WorkerThread` where it is created. When we `set`, we're careful to make sure that the `Registry` remains alive while we do the inner `set` and then `notify_worker_latch_is_set`. We knew from past bugs that the `SpinLatch` could be invalidated between set and notify, but the `&Arc` could also be invalidated if the target thread sees the set *and exits* (dropping its `WorkerThread`) before the notification. That's a fairly long race, but preemption could make that happen.

The inner `Registry` will still be alive, since the current thread is part of that pool, so we can hold that reference directly.

Fixes #913
Fixes #929

Co-authored-by: Josh Stone <cuviper@gmail.com>
  • Loading branch information
bors[bot] and cuviper committed May 13, 2022
2 parents 9801de7 + c2a0c51 commit 19bf115
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 178 deletions.
4 changes: 4 additions & 0 deletions RELEASES.md
@@ -1,3 +1,7 @@
# Release rayon-core 1.9.3 (2022-05-13)

- Fixed a use-after-free race in job notification.

# Release rayon 1.5.2 / rayon-core 1.9.2 (2022-04-13)

- The new `ParallelSlice::par_rchunks()` and `par_rchunks_exact()` iterate
Expand Down
328 changes: 175 additions & 153 deletions ci/compat-Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rayon-core/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "rayon-core"
version = "1.9.2" # reminder to update html_root_url attribute
version = "1.9.3" # reminder to update html_root_url attribute
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Core APIs for Rayon"
Expand Down
10 changes: 6 additions & 4 deletions rayon-core/src/latch.rs
Expand Up @@ -189,19 +189,21 @@ impl<'r> Latch for SpinLatch<'r> {
fn set(&self) {
let cross_registry;

let registry = if self.cross {
let registry: &Registry = if self.cross {
// Ensure the registry stays alive while we notify it.
// Otherwise, it would be possible that we set the spin
// latch and the other thread sees it and exits, causing
// the registry to be deallocated, all before we get a
// chance to invoke `registry.notify_worker_latch_is_set`.
cross_registry = Arc::clone(self.registry);
&cross_registry
&*cross_registry
} else {
// If this is not a "cross-registry" spin-latch, then the
// thread which is performing `set` is itself ensuring
// that the registry stays alive.
self.registry
// that the registry stays alive. However, that doesn't
// include this *particular* `Arc` handle if the waiting
// thread then exits, so we must completely dereference it.
&**self.registry
};
let target_worker_index = self.target_worker_index;

Expand Down
18 changes: 9 additions & 9 deletions rayon-core/src/registry.rs
Expand Up @@ -251,7 +251,7 @@ impl Registry {
let thread = ThreadBuilder {
name: builder.get_thread_name(index),
stack_size: builder.get_stack_size(),
registry: registry.clone(),
registry: Arc::clone(&registry),
worker,
index,
};
Expand All @@ -263,17 +263,18 @@ impl Registry {
// Returning normally now, without termination.
mem::forget(t1000);

Ok(registry.clone())
Ok(registry)
}

pub(super) fn current() -> Arc<Registry> {
unsafe {
let worker_thread = WorkerThread::current();
if worker_thread.is_null() {
global_registry().clone()
let registry = if worker_thread.is_null() {
global_registry()
} else {
(*worker_thread).registry.clone()
}
&(*worker_thread).registry
};
Arc::clone(registry)
}
}

Expand Down Expand Up @@ -804,9 +805,10 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz
fifo: JobFifo::new(),
index,
rng: XorShift64Star::new(),
registry: registry.clone(),
registry,
};
WorkerThread::set_current(worker_thread);
let registry = &*worker_thread.registry;

// let registry know we are ready to do work
registry.thread_infos[index].primed.set();
Expand All @@ -818,7 +820,6 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz

// Inform a user callback that we started a thread.
if let Some(ref handler) = registry.start_handler {
let registry = registry.clone();
match unwind::halt_unwinding(|| handler(index)) {
Ok(()) => {}
Err(err) => {
Expand Down Expand Up @@ -847,7 +848,6 @@ unsafe fn main_loop(worker: Worker<JobRef>, registry: Arc<Registry>, index: usiz

// Inform a user callback that we exited a thread.
if let Some(ref handler) = registry.exit_handler {
let registry = registry.clone();
match unwind::halt_unwinding(|| handler(index)) {
Ok(()) => {}
Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion rayon-core/src/spawn/mod.rs
Expand Up @@ -92,7 +92,7 @@ where
registry.increment_terminate_count();

Box::new(HeapJob::new({
let registry = registry.clone();
let registry = Arc::clone(registry);
move || {
match unwind::halt_unwinding(func) {
Ok(()) => {}
Expand Down
14 changes: 7 additions & 7 deletions rayon-core/src/test.rs
Expand Up @@ -22,8 +22,8 @@ fn start_callback_called() {
// Wait for all the threads in the pool plus the one running tests.
let barrier = Arc::new(Barrier::new(n_threads + 1));

let b = barrier.clone();
let nc = n_called.clone();
let b = Arc::clone(&barrier);
let nc = Arc::clone(&n_called);
let start_handler = move |_| {
nc.fetch_add(1, Ordering::SeqCst);
b.wait();
Expand All @@ -48,8 +48,8 @@ fn exit_callback_called() {
// Wait for all the threads in the pool plus the one running tests.
let barrier = Arc::new(Barrier::new(n_threads + 1));

let b = barrier.clone();
let nc = n_called.clone();
let b = Arc::clone(&barrier);
let nc = Arc::clone(&n_called);
let exit_handler = move |_| {
nc.fetch_add(1, Ordering::SeqCst);
b.wait();
Expand Down Expand Up @@ -85,9 +85,9 @@ fn handler_panics_handled_correctly() {
panic!("ensure panic handler is called when exiting");
};

let sb = start_barrier.clone();
let eb = exit_barrier.clone();
let nc = n_called.clone();
let sb = Arc::clone(&start_barrier);
let eb = Arc::clone(&exit_barrier);
let nc = Arc::clone(&n_called);
let panic_handler = move |_| {
let val = nc.fetch_add(1, Ordering::SeqCst);
if val < n_threads {
Expand Down
6 changes: 3 additions & 3 deletions rayon-core/src/thread_pool/test.rs
Expand Up @@ -28,7 +28,7 @@ fn workers_stop() {
// do some work on these threads
join_a_lot(22);

thread_pool.registry.clone()
Arc::clone(&thread_pool.registry)
});
assert_eq!(registry.num_threads(), 22);
}
Expand All @@ -53,7 +53,7 @@ fn sleeper_stop() {
{
// once we exit this block, thread-pool will be dropped
let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
registry = thread_pool.registry.clone();
registry = Arc::clone(&thread_pool.registry);

// Give time for at least some of the thread pool to fall asleep.
thread::sleep(time::Duration::from_secs(1));
Expand All @@ -67,7 +67,7 @@ fn sleeper_stop() {
/// Creates a start/exit handler that increments an atomic counter.
fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
let count = Arc::new(AtomicUsize::new(0));
(count.clone(), move |_| {
(Arc::clone(&count), move |_| {
count.fetch_add(1, Ordering::SeqCst);
})
}
Expand Down

0 comments on commit 19bf115

Please sign in to comment.