diff --git a/crossbeam-deque/src/lib.rs b/crossbeam-deque/src/lib.rs index 59004174d..a7e2efd37 100644 --- a/crossbeam-deque/src/lib.rs +++ b/crossbeam-deque/src/lib.rs @@ -710,11 +710,13 @@ impl Stealer { let task = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) + .is_err() { // We didn't steal this task, forget it. mem::forget(task); @@ -816,16 +818,18 @@ impl Stealer { } // Try incrementing the front index to steal the batch. - if self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(batch_size), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(batch_size), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { return Steal::Retry; } @@ -856,11 +860,18 @@ impl Stealer { let task = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(1), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { // We didn't steal this task, forget it and break from the loop. mem::forget(task); @@ -1002,17 +1013,19 @@ impl Stealer { } } - // Try incrementing the front index to steal the batch. - if self - .inner - .front - .compare_exchange( - f, - f.wrapping_add(batch_size + 1), - Ordering::SeqCst, - Ordering::Relaxed, - ) - .is_err() + // Try incrementing the front index to steal the task. + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(batch_size + 1), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { // We didn't steal this task, forget it. mem::forget(task); @@ -1058,11 +1071,18 @@ impl Stealer { let tmp = unsafe { buffer.deref().read(f) }; // Try incrementing the front index to steal the task. - if self - .inner - .front - .compare_exchange(f, f.wrapping_add(1), Ordering::SeqCst, Ordering::Relaxed) - .is_err() + // If the buffer has been swapped or the increment fails, we retry. + if self.inner.buffer.load(Ordering::Acquire, guard) != buffer + || self + .inner + .front + .compare_exchange( + f, + f.wrapping_add(1), + Ordering::SeqCst, + Ordering::Relaxed, + ) + .is_err() { // We didn't steal this task, forget it and break from the loop. mem::forget(tmp); @@ -1436,9 +1456,9 @@ impl Injector { // Destroy the block if we've reached the end, or if another thread wanted to destroy // but couldn't because we were busy reading from the slot. - if offset + 1 == BLOCK_CAP { - Block::destroy(block, offset); - } else if slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0 { + if (offset + 1 == BLOCK_CAP) + || (slot.state.fetch_or(READ, Ordering::AcqRel) & DESTROY != 0) + { Block::destroy(block, offset); }