Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mpmc xadd q poll wasn't handling correctly chunkSize = 1 pooled case #281

Merged
merged 1 commit into from Jan 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -107,92 +107,84 @@ public E poll()
ciChunkIndex = cIndex >> chunkShift;

final long ccChunkIndex = cChunk.lvIndex();
if (ciChunkIndex != ccChunkIndex)
{
// we are looking at the first element of the next chunk
if (ciChunkOffset == 0 && ciChunkIndex - ccChunkIndex == 1)
isFirstElementOfNextChunk = ciChunkOffset == 0 && cIndex != 0;
if (isFirstElementOfNextChunk) {
if (ciChunkIndex - ccChunkIndex != 1)
{
continue;
}
next = cChunk.lvNext();
// next could have been modified by another racing consumer, but:
// - if null: it still needs to check q empty + casConsumerIndex
// - if !null: it will fail on casConsumerIndex
if (next == null)
{
isFirstElementOfNextChunk = true;
next = cChunk.lvNext();
// next could have been modified by another racing consumer, but:
// - if null: it still needs to check q empty + casConsumerIndex
// - if !null: it will fail on casConsumerIndex
if (next == null)
{
if (cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must
{
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
// we will go ahead with the CAS and have the winning consumer spin for the next buffer
}
// not empty: can attempt the cas (and transition to next chunk if successful)
if (casConsumerIndex(cIndex, cIndex + 1))
{
break;
}
} else if (ccChunkIndex < ciChunkIndex) {
// it allows to fail fast if the q is empty after the first element on the new chunk:
// it can happen if consumerIndex is being already moved forward, but
// the consumerChunk isn't yet rotated
if (cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must
{
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
// we will go ahead with the CAS and have the winning consumer spin for the next buffer
}
// The chunk doesn't match the consumer index view of the required chunk index. This is where consumers
// waiting for the `next` chunk to appear will be waiting while the consumer that took the first element
// in that chunk sets it up for them.
// not empty: can attempt the cas (and transition to next chunk if successful)
if (casConsumerIndex(cIndex, cIndex + 1))
{
break;
}
continue;
}
if (ccChunkIndex > ciChunkIndex)
{
//stale view of the world
continue;
}
isFirstElementOfNextChunk = false;
// mid chunk elements
assert !isFirstElementOfNextChunk && ccChunkIndex <= ciChunkIndex;
pooled = cChunk.isPooled();
if (pooled)
if (ccChunkIndex == ciChunkIndex)
{
// Pooled chunks need a stronger guarantee than just element null checking in case of a stale view
// on a reused entry where a racing consumer has grabbed the slot but not yet null-ed it out and a
// producer has not yet set it to the new value.
final long sequence = cChunk.lvSequence(ciChunkOffset);
if (sequence != ciChunkIndex)
if (pooled)
{
// it covers both cases:
// - right chunk, awaiting element to be set
// - old chunk, awaiting rotation
// it allows to fail fast if the q is empty after the first element on the new chunk
if (sequence < ciChunkIndex &&
cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must
// Pooled chunks need a stronger guarantee than just element null checking in case of a stale view
// on a reused entry where a racing consumer has grabbed the slot but not yet null-ed it out and a
// producer has not yet set it to the new value.
final long sequence = cChunk.lvSequence(ciChunkOffset);
if (sequence == ciChunkIndex)
{
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
if (casConsumerIndex(cIndex, cIndex + 1))
{
break;
}
continue;
}
//stale view of the world
continue;
if (sequence > ciChunkIndex)
{
//stale view of the world
continue;
}
// sequence < ciChunkIndex: element yet to be set?
}
}
else
{
e = cChunk.lvElement(ciChunkOffset);
if (e == null) // consumers do not skip unset slots
else
{
if (cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must
e = cChunk.lvElement(ciChunkOffset);
if (e != null)
{
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
if (casConsumerIndex(cIndex, cIndex + 1))
{
break;
}
continue;
}
// This is where we spin for the element to appear **before attempting the CAS**.
continue;
// e == null: element yet to be set?
}
}

if (casConsumerIndex(cIndex, cIndex + 1))
// ccChunkIndex < ciChunkIndex || e == null || sequence < ciChunkIndex:
if (cIndex >= pIndex && // test against cached pIndex
cIndex == (pIndex = lvProducerIndex())) // update pIndex if we must
{
break;
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
return null;
}
}

Expand Down Expand Up @@ -230,10 +222,7 @@ private E linkNextConsumerChunkAndPoll(
final boolean pooled = next.isPooled();
if (pooled)
{
while (next.lvSequence(0) != expectedChunkIndex)
{

}
next.spinForSequence(0, expectedChunkIndex);
}

next.soElement(0, null);
Expand Down
Expand Up @@ -40,11 +40,27 @@ final class MpmcUnboundedXaddChunk<E> extends MpUnboundedXaddChunk<MpmcUnbounded

void soSequence(int index, long e)
{
assert isPooled();
soLongElement(sequence, calcLongElementOffset(index), e);
}

long lvSequence(int index)
{
assert isPooled();
return lvLongElement(sequence, calcLongElementOffset(index));
}

void spinForSequence(int index, long e)
{
assert isPooled();
final long[] sequence = this.sequence;
final long offset = calcLongElementOffset(index);
while (true)
{
if (lvLongElement(sequence, offset) == e)
{
break;
}
}
}
}