diff --git a/jctools-core/src/main/java/org/jctools/queues/MpUnboundedXaddArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpUnboundedXaddArrayQueue.java index 8452414a..e19edb80 100644 --- a/jctools-core/src/main/java/org/jctools/queues/MpUnboundedXaddArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpUnboundedXaddArrayQueue.java @@ -359,6 +359,9 @@ final void moveToNextConsumerChunk(R cChunk, R next) assert pooled; } this.soConsumerChunk(next); + // MC case: + // from now on the code is not single-threaded anymore and + // other consumers can move forward consumerIndex } @Override diff --git a/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java index aa61d943..5eb10a48 100644 --- a/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java @@ -90,7 +90,7 @@ public E poll() long cIndex; MpmcUnboundedXaddChunk cChunk; int ciChunkOffset; - boolean isFirstElementOfNextChunk; + boolean isFirstElementOfNewChunk; boolean pooled = false; E e = null; MpmcUnboundedXaddChunk next = null; @@ -98,6 +98,7 @@ public E poll() long ciChunkIndex; while (true) { + isFirstElementOfNewChunk = false; cIndex = this.lvConsumerIndex(); // chunk is in sync with the index, and is safe to mutate after CAS of index (because we pre-verify it // matched the indicate ciChunkIndex) @@ -107,12 +108,12 @@ public E poll() ciChunkIndex = cIndex >> chunkShift; final long ccChunkIndex = cChunk.lvIndex(); - isFirstElementOfNextChunk = ciChunkOffset == 0 && cIndex != 0; - if (isFirstElementOfNextChunk) { + if (ciChunkOffset == 0 && cIndex != 0) { if (ciChunkIndex - ccChunkIndex != 1) { continue; } + isFirstElementOfNewChunk = true; next = cChunk.lvNext(); // next could have been modified by another racing consumer, but: // - if null: it still needs to check q empty + casConsumerIndex @@ -140,7 +141,7 @@ public E poll() continue; } // mid chunk elements - assert !isFirstElementOfNextChunk && ccChunkIndex <= ciChunkIndex; + assert !isFirstElementOfNewChunk && ccChunkIndex <= ciChunkIndex; pooled = cChunk.isPooled(); if (ccChunkIndex == ciChunkIndex) { @@ -189,7 +190,7 @@ public E poll() } // if we are the isFirstElementOfNextChunk we need to get the consumer chunk - if (isFirstElementOfNextChunk) + if (isFirstElementOfNewChunk) { e = linkNextConsumerChunkAndPoll(cChunk, next, ciChunkIndex); }