Skip to content

Commit

Permalink
mpmc xadd q poll docs + clearing up names (#282)
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 authored and nitsanw committed Jan 3, 2020
1 parent cf35ee3 commit c6836e8
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
Expand Up @@ -359,6 +359,9 @@ final void moveToNextConsumerChunk(R cChunk, R next)
assert pooled;
}
this.soConsumerChunk(next);
// MC case:
// from now on the code is not single-threaded anymore and
// other consumers can move forward consumerIndex
}

@Override
Expand Down
Expand Up @@ -90,14 +90,15 @@ public E poll()
long cIndex;
MpmcUnboundedXaddChunk<E> cChunk;
int ciChunkOffset;
boolean isFirstElementOfNextChunk;
boolean isFirstElementOfNewChunk;
boolean pooled = false;
E e = null;
MpmcUnboundedXaddChunk<E> next = null;
long pIndex = -1; // start with bogus value, hope we don't need it
long ciChunkIndex;
while (true)
{
isFirstElementOfNewChunk = false;
cIndex = this.lvConsumerIndex();
// chunk is in sync with the index, and is safe to mutate after CAS of index (because we pre-verify it
// matched the indicate ciChunkIndex)
Expand All @@ -107,12 +108,12 @@ public E poll()
ciChunkIndex = cIndex >> chunkShift;

final long ccChunkIndex = cChunk.lvIndex();
isFirstElementOfNextChunk = ciChunkOffset == 0 && cIndex != 0;
if (isFirstElementOfNextChunk) {
if (ciChunkOffset == 0 && cIndex != 0) {
if (ciChunkIndex - ccChunkIndex != 1)
{
continue;
}
isFirstElementOfNewChunk = true;
next = cChunk.lvNext();
// next could have been modified by another racing consumer, but:
// - if null: it still needs to check q empty + casConsumerIndex
Expand Down Expand Up @@ -140,7 +141,7 @@ public E poll()
continue;
}
// mid chunk elements
assert !isFirstElementOfNextChunk && ccChunkIndex <= ciChunkIndex;
assert !isFirstElementOfNewChunk && ccChunkIndex <= ciChunkIndex;
pooled = cChunk.isPooled();
if (ccChunkIndex == ciChunkIndex)
{
Expand Down Expand Up @@ -189,7 +190,7 @@ public E poll()
}

// if we are the isFirstElementOfNextChunk we need to get the consumer chunk
if (isFirstElementOfNextChunk)
if (isFirstElementOfNewChunk)
{
e = linkNextConsumerChunkAndPoll(cChunk, next, ciChunkIndex);
}
Expand Down

0 comments on commit c6836e8

Please sign in to comment.