Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mpmc xadd q poll docs + clearing up names #282

Merged
merged 1 commit into from Jan 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -359,6 +359,9 @@ final void moveToNextConsumerChunk(R cChunk, R next)
assert pooled;
}
this.soConsumerChunk(next);
// MC case:
// from now on the code is not single-threaded anymore and
// other consumers can move forward consumerIndex
}

@Override
Expand Down
Expand Up @@ -90,14 +90,15 @@ public E poll()
long cIndex;
MpmcUnboundedXaddChunk<E> cChunk;
int ciChunkOffset;
boolean isFirstElementOfNextChunk;
boolean isFirstElementOfNewChunk;
boolean pooled = false;
E e = null;
MpmcUnboundedXaddChunk<E> next = null;
long pIndex = -1; // start with bogus value, hope we don't need it
long ciChunkIndex;
while (true)
{
isFirstElementOfNewChunk = false;
cIndex = this.lvConsumerIndex();
// chunk is in sync with the index, and is safe to mutate after CAS of index (because we pre-verify it
// matched the indicate ciChunkIndex)
Expand All @@ -107,12 +108,12 @@ public E poll()
ciChunkIndex = cIndex >> chunkShift;

final long ccChunkIndex = cChunk.lvIndex();
isFirstElementOfNextChunk = ciChunkOffset == 0 && cIndex != 0;
if (isFirstElementOfNextChunk) {
if (ciChunkOffset == 0 && cIndex != 0) {
if (ciChunkIndex - ccChunkIndex != 1)
{
continue;
}
isFirstElementOfNewChunk = true;
next = cChunk.lvNext();
// next could have been modified by another racing consumer, but:
// - if null: it still needs to check q empty + casConsumerIndex
Expand Down Expand Up @@ -140,7 +141,7 @@ public E poll()
continue;
}
// mid chunk elements
assert !isFirstElementOfNextChunk && ccChunkIndex <= ciChunkIndex;
assert !isFirstElementOfNewChunk && ccChunkIndex <= ciChunkIndex;
pooled = cChunk.isPooled();
if (ccChunkIndex == ciChunkIndex)
{
Expand Down Expand Up @@ -189,7 +190,7 @@ public E poll()
}

// if we are the isFirstElementOfNextChunk we need to get the consumer chunk
if (isFirstElementOfNextChunk)
if (isFirstElementOfNewChunk)
{
e = linkNextConsumerChunkAndPoll(cChunk, next, ciChunkIndex);
}
Expand Down