Skip to content

Commit

Permalink
mpmc xadd q poll wasn't handling correctly chunkSize = 1 pooled case (#…
Browse files Browse the repository at this point in the history
…281)

QueueSanityTest::testSize with chunkSize = 1 and 1 recycled chunks was
failing with a blocked poll because it wasn't handling ccChunkIndex ==
ciChunkIndex as an isFirstElementOfNextChunk case
  • Loading branch information
franz1981 authored and nitsanw committed Jan 2, 2020
1 parent 264e669 commit cf35ee3
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 67 deletions.
Expand Up @@ -107,92 +107,84 @@ public E poll()
ciChunkIndex = cIndex >> chunkShift;

final long ccChunkIndex = cChunk.lvIndex();
if (ciChunkIndex != ccChunkIndex)
{
// we are looking at the first element of the next chunk
if (ciChunkOffset == 0 && ciChunkIndex - ccChunkIndex == 1)
isFirstElementOfNextChunk = ciChunkOffset == 0 && cIndex != 0;
if (isFirstElementOfNextChunk) {
if (ciChunkIndex - ccChunkIndex != 1)
{
continue;
}
next = cChunk.lvNext();
// next could have been modified by another racing consumer, but:
// - if null: it still needs to check q empty + casConsumerIndex
// - if !null: it will fail on casConsumerIndex
if (next == null)
{
isFirstElementOfNextChunk = true;
next = cChunk.lvNext();
// next could have been modified by another racing consumer, but:
// - if null: it still needs to check q empty + casConsumerIndex
// - if !null: it will fail on casConsumerIndex
if (next == null)
{
if (cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must
{
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
// we will go ahead with the CAS and have the winning consumer spin for the next buffer
}
// not empty: can attempt the cas (and transition to next chunk if successful)
if (casConsumerIndex(cIndex, cIndex + 1))
{
break;
}
} else if (ccChunkIndex < ciChunkIndex) {
// it allows to fail fast if the q is empty after the first element on the new chunk:
// it can happen if consumerIndex is being already moved forward, but
// the consumerChunk isn't yet rotated
if (cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must
{
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
// we will go ahead with the CAS and have the winning consumer spin for the next buffer
}
// The chunk doesn't match the consumer index view of the required chunk index. This is where consumers
// waiting for the `next` chunk to appear will be waiting while the consumer that took the first element
// in that chunk sets it up for them.
// not empty: can attempt the cas (and transition to next chunk if successful)
if (casConsumerIndex(cIndex, cIndex + 1))
{
break;
}
continue;
}
if (ccChunkIndex > ciChunkIndex)
{
//stale view of the world
continue;
}
isFirstElementOfNextChunk = false;
// mid chunk elements
assert !isFirstElementOfNextChunk && ccChunkIndex <= ciChunkIndex;
pooled = cChunk.isPooled();
if (pooled)
if (ccChunkIndex == ciChunkIndex)
{
// Pooled chunks need a stronger guarantee than just element null checking in case of a stale view
// on a reused entry where a racing consumer has grabbed the slot but not yet null-ed it out and a
// producer has not yet set it to the new value.
final long sequence = cChunk.lvSequence(ciChunkOffset);
if (sequence != ciChunkIndex)
if (pooled)
{
// it covers both cases:
// - right chunk, awaiting element to be set
// - old chunk, awaiting rotation
// it allows to fail fast if the q is empty after the first element on the new chunk
if (sequence < ciChunkIndex &&
cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must
// Pooled chunks need a stronger guarantee than just element null checking in case of a stale view
// on a reused entry where a racing consumer has grabbed the slot but not yet null-ed it out and a
// producer has not yet set it to the new value.
final long sequence = cChunk.lvSequence(ciChunkOffset);
if (sequence == ciChunkIndex)
{
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
if (casConsumerIndex(cIndex, cIndex + 1))
{
break;
}
continue;
}
//stale view of the world
continue;
if (sequence > ciChunkIndex)
{
//stale view of the world
continue;
}
// sequence < ciChunkIndex: element yet to be set?
}
}
else
{
e = cChunk.lvElement(ciChunkOffset);
if (e == null) // consumers do not skip unset slots
else
{
if (cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must
e = cChunk.lvElement(ciChunkOffset);
if (e != null)
{
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
if (casConsumerIndex(cIndex, cIndex + 1))
{
break;
}
continue;
}
// This is where we spin for the element to appear **before attempting the CAS**.
continue;
// e == null: element yet to be set?
}
}

if (casConsumerIndex(cIndex, cIndex + 1))
// ccChunkIndex < ciChunkIndex || e == null || sequence < ciChunkIndex:
if (cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must
{
break;
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
}

Expand Down Expand Up @@ -230,10 +222,7 @@ private E linkNextConsumerChunkAndPoll(
final boolean pooled = next.isPooled();
if (pooled)
{
while (next.lvSequence(0) != expectedChunkIndex)
{

}
next.spinForSequence(0, expectedChunkIndex);
}

next.soElement(0, null);
Expand Down
Expand Up @@ -40,11 +40,27 @@ final class MpmcUnboundedXaddChunk<E> extends MpUnboundedXaddChunk<MpmcUnbounded

void soSequence(int index, long e)
{
assert isPooled();
soLongElement(sequence, calcLongElementOffset(index), e);
}

long lvSequence(int index)
{
assert isPooled();
return lvLongElement(sequence, calcLongElementOffset(index));
}

void spinForSequence(int index, long e)
{
assert isPooled();
final long[] sequence = this.sequence;
final long offset = calcLongElementOffset(index);
while (true)
{
if (lvLongElement(sequence, offset) == e)
{
break;
}
}
}
}

0 comments on commit cf35ee3

Please sign in to comment.