Skip to content

Commit

Permalink
Merge #726
Browse files Browse the repository at this point in the history
726: Fix deque steal race condition r=taiki-e a=kmaork



Co-authored-by: Maor Kleinberger <kmaork@gmail.com>
  • Loading branch information
bors[bot] and kmaork committed Jul 30, 2021
2 parents 2653a6c + 38c07fc commit 3e72cde
Showing 1 changed file with 56 additions and 36 deletions.
92 changes: 56 additions & 36 deletions crossbeam-deque/src/deque.rs
Expand Up @@ -652,11 +652,13 @@ impl<T> Stealer<T> {
let task = unsafe { buffer.deref().read(f) };

// Try incrementing the front index to steal the task.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
Expand Down Expand Up @@ -758,16 +760,18 @@ impl<T> Stealer<T> {
}

// Try incrementing the front index to steal the batch.
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
return Steal::Retry;
}
Expand Down Expand Up @@ -803,11 +807,18 @@ impl<T> Stealer<T> {
let task = unsafe { buffer.deref().read(f) };

// Try incrementing the front index to steal the task.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
mem::forget(task);
Expand Down Expand Up @@ -949,17 +960,19 @@ impl<T> Stealer<T> {
}
}

// Try incrementing the front index to steal the batch.
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size + 1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
// Try incrementing the front index to steal the task.
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size + 1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
Expand Down Expand Up @@ -1010,11 +1023,18 @@ impl<T> Stealer<T> {
let tmp = unsafe { buffer.deref().read(f) };

// Try incrementing the front index to steal the task.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
mem::forget(tmp);
Expand Down

0 comments on commit 3e72cde

Please sign in to comment.