Skip to content

Commit

Permalink
Minor rework to eagerly skip CONSUMED slots after cIndex slot consumed
Browse files Browse the repository at this point in the history
This has some advantages:
- poll() will always succeed after consumer reads !isEmpty()
- Slots are released sooner, allowing offers to succeed for bounded
queues in some cases where they would otherwise have failed
- Resulting code is simpler

At the expense of occasionally performing work on critical path which
_might not_ have always happened on critical path before.
  • Loading branch information
njhill committed Aug 20, 2019
1 parent 6676e9c commit c124517
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 52 deletions.
51 changes: 25 additions & 26 deletions jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ public final int failFastOffer(final E e)

private static final Object CONSUMED = new Object();

private final static int SPIN_COUNT = 128; //TODO TBD?
private final static int SPIN_COUNT = 128; //TODO TBD, maybe tune or heuristic

// consumer-local cache of highest seen producer index
private long seenProducerIndex;
Expand All @@ -333,35 +333,35 @@ public final int failFastOffer(final E e)
@Override
public E poll()
{
final long cIndex = lpConsumerIndex();
final long offset = calcElementOffset(cIndex);
// Copy field to avoid re-reading after volatile load
final E[] buffer = this.buffer;
long cIndex = lpConsumerIndex();
long offset;
for (long cIndexBefore = cIndex;; cIndex++)

long seenPIndex = seenProducerIndex;
E e = lvElement(buffer, offset); // LoadLoad
if (e != null)
{
offset = calcElementOffset(cIndex);
E e = lvElement(buffer, offset);// LoadLoad
if (e == null)
{
if (cIndexBefore != cIndex)
{
soConsumerIndex(cIndex);
}
break;
}
spElement(buffer, offset, null);
if (e != CONSUMED)
{
soConsumerIndex(cIndex + 1L); // StoreStore
return e;
}
incrementConsumerIndex(buffer, offset, cIndex, seenPIndex);
return e;
}
long pIndex = seenProducerIndex;
if (pIndex <= cIndex && (pIndex = lvProducerIndex()) == cIndex)
if (seenPIndex <= cIndex && (seenPIndex = lvProducerIndex()) == cIndex)
{
return null;
}
return pollMissPath(buffer, offset, cIndex, pIndex);
return pollMissPath(buffer, offset, cIndex, seenPIndex);
}

private void incrementConsumerIndex(final E[] buffer, long offset, long cIndex, final long pIndex) {
// This is called after the current cIndex slot has been read, and moves the consumer index
// to the next non-CONSUMED slot, nulling all preceding slots
spElement(buffer, offset, null);
// If no lookahead has happened then pIndex <= cIndex + 1 and we won't enter the loop
while (++cIndex < pIndex && lpElement(buffer, offset = calcElementOffset(cIndex)) == CONSUMED)
{
spElement(buffer, offset, null);
}
soConsumerIndex(cIndex);
}

private E pollMissPath(final E[] buffer, final long cOffset, final long cIndex, long pIndex)
Expand All @@ -381,16 +381,15 @@ private E pollMissPath(final E[] buffer, final long cOffset, final long cIndex,
}
}

private E tryRange(final E[] buffer, final long cOffset, long cIndex, final long pIndex) {
private E tryRange(final E[] buffer, final long cOffset, final long cIndex, final long pIndex) {
long candidateIndex = -1L;
E candidate = null;
outer: while (true)
{
E e = lvElement(buffer, cOffset);
if (e != null)
{
spElement(buffer, cOffset, null);
soConsumerIndex(cIndex + 1L);
incrementConsumerIndex(buffer, cOffset, cIndex, pIndex);
return e;
}
for (long index = cIndex + 1L; index < pIndex; index++)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public final int failFastOffer(final E e) {

private static final Object CONSUMED = new Object();

// TODO TBD?
// TODO TBD, maybe tune or heuristic
private final static int SPIN_COUNT = 128;

// consumer-local cache of highest seen producer index
Expand All @@ -337,30 +337,30 @@ public final int failFastOffer(final E e) {
public E poll() {
// Copy field to avoid re-reading after volatile load
final AtomicReferenceArray<E> buffer = this.buffer;
long cIndex = lpConsumerIndex();
int offset;
for (long cIndexBefore = cIndex; ; cIndex++) {
offset = calcElementOffset(cIndex);
// LoadLoad
E e = lvElement(buffer, offset);
if (e == null) {
if (cIndexBefore != cIndex) {
soConsumerIndex(cIndex);
}
break;
}
spElement(buffer, offset, null);
if (e != CONSUMED) {
// StoreStore
soConsumerIndex(cIndex + 1L);
return e;
}
long seenPIndex = seenProducerIndex;
final long cIndex = lpConsumerIndex();
final int offset = calcElementOffset(cIndex);
// LoadLoad
E e = lvElement(buffer, offset);
if (e != null) {
incrementConsumerIndex(buffer, offset, cIndex, seenPIndex);
return e;
}
long pIndex = seenProducerIndex;
if (pIndex <= cIndex && (pIndex = lvProducerIndex()) == cIndex) {
if (seenPIndex <= cIndex && (seenPIndex = lvProducerIndex()) == cIndex) {
return null;
}
return pollMissPath(buffer, offset, cIndex, pIndex);
return pollMissPath(buffer, offset, cIndex, seenPIndex);
}

private void incrementConsumerIndex(final AtomicReferenceArray<E> buffer, int offset, long cIndex, final long pIndex) {
// This is called after the current cIndex slot has been read, and moves the consumer index
// to the next non-CONSUMED slot, nulling all preceding slots
spElement(buffer, offset, null);
// If no lookahead has happened then pIndex <= cIndex + 1 and we won't enter the loop
while (++cIndex < pIndex && lpElement(buffer, offset = calcElementOffset(cIndex)) == CONSUMED) {
spElement(buffer, offset, null);
}
soConsumerIndex(cIndex);
}

private E pollMissPath(final AtomicReferenceArray<E> buffer, final int cOffset, final long cIndex, long pIndex) {
Expand All @@ -376,14 +376,13 @@ private E pollMissPath(final AtomicReferenceArray<E> buffer, final int cOffset,
}
}

private E tryRange(final AtomicReferenceArray<E> buffer, final int cOffset, long cIndex, final long pIndex) {
private E tryRange(final AtomicReferenceArray<E> buffer, final int cOffset, final long cIndex, final long pIndex) {
long candidateIndex = -1L;
E candidate = null;
outer: while (true) {
E e = lvElement(buffer, cOffset);
if (e != null) {
spElement(buffer, cOffset, null);
soConsumerIndex(cIndex + 1L);
incrementConsumerIndex(buffer, cOffset, cIndex, pIndex);
return e;
}
for (long index = cIndex + 1L; index < pIndex; index++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ public void run()

}

//@Test(timeout = TEST_TIMEOUT)
@Test(timeout = TEST_TIMEOUT)
public void testPollAfterIsEmpty() throws Exception
{
final AtomicBoolean stop = new AtomicBoolean();
Expand Down

0 comments on commit c124517

Please sign in to comment.