From cf35ee35b2c66f2f72db8bf604fc650141f9b6df Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Thu, 2 Jan 2020 14:14:40 +0100 Subject: [PATCH] mpmc xadd q poll wasn't handling correctly chunkSize = 1 pooled case (#281) QueueSanityTest::testSize with chunkSize = 1 and 1 recycled chunks was failing with a blocked poll because it wasn't handling ccChunkIndex == ciChunkIndex as an isFirstElementOfNextChunk case --- .../queues/MpmcUnboundedXaddArrayQueue.java | 123 ++++++++---------- .../queues/MpmcUnboundedXaddChunk.java | 16 +++ 2 files changed, 72 insertions(+), 67 deletions(-) diff --git a/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java index 4c4e3a59..aa61d943 100644 --- a/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddArrayQueue.java @@ -107,92 +107,84 @@ public E poll() ciChunkIndex = cIndex >> chunkShift; final long ccChunkIndex = cChunk.lvIndex(); - if (ciChunkIndex != ccChunkIndex) - { - // we are looking at the first element of the next chunk - if (ciChunkOffset == 0 && ciChunkIndex - ccChunkIndex == 1) + isFirstElementOfNextChunk = ciChunkOffset == 0 && cIndex != 0; + if (isFirstElementOfNextChunk) { + if (ciChunkIndex - ccChunkIndex != 1) + { + continue; + } + next = cChunk.lvNext(); + // next could have been modified by another racing consumer, but: + // - if null: it still needs to check q empty + casConsumerIndex + // - if !null: it will fail on casConsumerIndex + if (next == null) { - isFirstElementOfNextChunk = true; - next = cChunk.lvNext(); - // next could have been modified by another racing consumer, but: - // - if null: it still needs to check q empty + casConsumerIndex - // - if !null: it will fail on casConsumerIndex - if (next == null) - { - if (cIndex >= pIndex && // test against cached pIndex - cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must - { - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] - return null; - } - // we will go ahead with the CAS and have the winning consumer spin for the next buffer - } - // not empty: can attempt the cas (and transition to next chunk if successful) - if (casConsumerIndex(cIndex, cIndex + 1)) - { - break; - } - } else if (ccChunkIndex < ciChunkIndex) { - // it allows to fail fast if the q is empty after the first element on the new chunk: - // it can happen if consumerIndex is being already moved forward, but - // the consumerChunk isn't yet rotated if (cIndex >= pIndex && // test against cached pIndex cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must { // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] return null; } + // we will go ahead with the CAS and have the winning consumer spin for the next buffer } - // The chunk doesn't match the consumer index view of the required chunk index. This is where consumers - // waiting for the `next` chunk to appear will be waiting while the consumer that took the first element - // in that chunk sets it up for them. + // not empty: can attempt the cas (and transition to next chunk if successful) + if (casConsumerIndex(cIndex, cIndex + 1)) + { + break; + } + continue; + } + if (ccChunkIndex > ciChunkIndex) + { + //stale view of the world continue; } - isFirstElementOfNextChunk = false; // mid chunk elements + assert !isFirstElementOfNextChunk && ccChunkIndex <= ciChunkIndex; pooled = cChunk.isPooled(); - if (pooled) + if (ccChunkIndex == ciChunkIndex) { - // Pooled chunks need a stronger guarantee than just element null checking in case of a stale view - // on a reused entry where a racing consumer has grabbed the slot but not yet null-ed it out and a - // producer has not yet set it to the new value. - final long sequence = cChunk.lvSequence(ciChunkOffset); - if (sequence != ciChunkIndex) + if (pooled) { - // it covers both cases: - // - right chunk, awaiting element to be set - // - old chunk, awaiting rotation - // it allows to fail fast if the q is empty after the first element on the new chunk - if (sequence < ciChunkIndex && - cIndex >= pIndex && // test against cached pIndex - cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must + // Pooled chunks need a stronger guarantee than just element null checking in case of a stale view + // on a reused entry where a racing consumer has grabbed the slot but not yet null-ed it out and a + // producer has not yet set it to the new value. + final long sequence = cChunk.lvSequence(ciChunkOffset); + if (sequence == ciChunkIndex) { - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] - return null; + if (casConsumerIndex(cIndex, cIndex + 1)) + { + break; + } + continue; } - //stale view of the world - continue; + if (sequence > ciChunkIndex) + { + //stale view of the world + continue; + } + // sequence < ciChunkIndex: element yet to be set? } - } - else - { - e = cChunk.lvElement(ciChunkOffset); - if (e == null) // consumers do not skip unset slots + else { - if (cIndex >= pIndex && // test against cached pIndex - cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must + e = cChunk.lvElement(ciChunkOffset); + if (e != null) { - // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] - return null; + if (casConsumerIndex(cIndex, cIndex + 1)) + { + break; + } + continue; } - // This is where we spin for the element to appear **before attempting the CAS**. - continue; + // e == null: element yet to be set? } } - - if (casConsumerIndex(cIndex, cIndex + 1)) + // ccChunkIndex < ciChunkIndex || e == null || sequence < ciChunkIndex: + if (cIndex >= pIndex && // test against cached pIndex + cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must { - break; + // strict empty check, this ensures [Queue.poll() == null iff isEmpty()] + return null; } } @@ -230,10 +222,7 @@ private E linkNextConsumerChunkAndPoll( final boolean pooled = next.isPooled(); if (pooled) { - while (next.lvSequence(0) != expectedChunkIndex) - { - - } + next.spinForSequence(0, expectedChunkIndex); } next.soElement(0, null); diff --git a/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddChunk.java b/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddChunk.java index 4c4d61ce..7f2dfa64 100644 --- a/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddChunk.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpmcUnboundedXaddChunk.java @@ -40,11 +40,27 @@ final class MpmcUnboundedXaddChunk extends MpUnboundedXaddChunk