diff --git a/crossbeam-deque/src/deque.rs b/crossbeam-deque/src/deque.rs index fcd6f9fc6..ea95e202a 100644 --- a/crossbeam-deque/src/deque.rs +++ b/crossbeam-deque/src/deque.rs @@ -635,11 +635,13 @@ impl Stealer { let task = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) + .is_err() { // We didn't steal this task, forget it. mem::forget(task); @@ -741,16 +743,18 @@ impl Stealer { } // Try incrementing the front index to steal the batch. - if self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(batch_size), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(batch_size), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { return Steal::Retry; } @@ -781,11 +785,18 @@ impl Stealer { let task = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(1), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { // We didn't steal this task, forget it and break from the loop. mem::forget(task); @@ -927,17 +938,19 @@ impl Stealer { } } - // Try incrementing the front index to steal the batch. - if self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(batch_size + 1), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() + // Try incrementing the front index to steal the task. + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(batch_size + 1), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { // We didn't steal this task, forget it. mem::forget(task); @@ -983,11 +996,18 @@ impl Stealer { let tmp = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(1), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { // We didn't steal this task, forget it and break from the loop. mem::forget(tmp);