Skip to content

Commit

Permalink
fix deque steal race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
kmaork committed Jun 11, 2021
1 parent 25f569c commit 38c07fc
Showing 1 changed file with 56 additions and 36 deletions.
92 changes: 56 additions & 36 deletions crossbeam-deque/src/deque.rs
Expand Up @@ -635,11 +635,13 @@ impl<T> Stealer<T> {
let task = unsafe { buffer.deref().read(f) };

// Try incrementing the front index to steal the task.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
Expand Down Expand Up @@ -741,16 +743,18 @@ impl<T> Stealer<T> {
}

// Try incrementing the front index to steal the batch.
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
return Steal::Retry;
}
Expand Down Expand Up @@ -781,11 +785,18 @@ impl<T> Stealer<T> {
let task = unsafe { buffer.deref().read(f) };

// Try incrementing the front index to steal the task.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
mem::forget(task);
Expand Down Expand Up @@ -927,17 +938,19 @@ impl<T> Stealer<T> {
}
}

// Try incrementing the front index to steal the batch.
if self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size + 1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
// Try incrementing the front index to steal the task.
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(batch_size + 1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
Expand Down Expand Up @@ -983,11 +996,18 @@ impl<T> Stealer<T> {
let tmp = unsafe { buffer.deref().read(f) };

// Try incrementing the front index to steal the task.
if self
.inner
.front
.compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed)
.is_err()
// If the buffer has been swapped or the increment fails, we retry.
if self.inner.buffer.load(Ordering::Acquire, guard) != buffer
|| self
.inner
.front
.compare_exchange(
f,
f.wrapping_add(1),
Ordering::SeqCst,
Ordering::Relaxed,
)
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
mem::forget(tmp);
Expand Down

0 comments on commit 38c07fc

Please sign in to comment.