-
Notifications
You must be signed in to change notification settings - Fork 554
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC: MPSC bubble avoidance #260
base: master
Are you sure you want to change the base?
Changes from all commits
86dbc0a
6676e9c
c124517
e6a7b1e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -109,6 +109,8 @@ abstract class MpscArrayQueueConsumerIndexField<E> extends MpscArrayQueueL2Pad<E | |
private final static long C_INDEX_OFFSET = fieldOffset(MpscArrayQueueConsumerIndexField.class, "consumerIndex"); | ||
|
||
private volatile long consumerIndex; | ||
// consumer-local cache of highest seen producer index | ||
protected long seenProducerIndex; | ||
|
||
MpscArrayQueueConsumerIndexField(int capacity) | ||
{ | ||
|
@@ -314,6 +316,10 @@ public final int failFastOffer(final E e) | |
return 0; // AWESOME :) | ||
} | ||
|
||
private final static Object CONSUMED = new Object(); | ||
|
||
private final static int SPIN_COUNT = 256; //TODO TBD, maybe tune or heuristic | ||
|
||
/** | ||
* {@inheritDoc} | ||
* <p> | ||
|
@@ -331,32 +337,78 @@ public E poll() | |
// Copy field to avoid re-reading after volatile load | ||
final E[] buffer = this.buffer; | ||
|
||
// If we can't see the next available element we can't poll | ||
long seenPIndex = seenProducerIndex; | ||
E e = lvElement(buffer, offset); // LoadLoad | ||
if (null == e) | ||
if (e != null) | ||
{ | ||
/* | ||
* NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after | ||
* winning the CAS on offer but before storing the element in the queue. Other producers may go on | ||
* to fill up the queue after this element. | ||
*/ | ||
if (cIndex != lvProducerIndex()) | ||
incrementConsumerIndex(buffer, offset, cIndex, seenPIndex); | ||
return e; | ||
} | ||
if (seenPIndex <= cIndex && (seenPIndex = lvProducerIndex()) == cIndex) | ||
{ | ||
return null; | ||
} | ||
return pollMissPath(buffer, offset, cIndex, seenPIndex); | ||
} | ||
|
||
private void incrementConsumerIndex(final E[] buffer, long offset, long cIndex, final long pIndex) { | ||
// This is called after the current cIndex slot has been read, and moves the consumer index | ||
// to the next non-CONSUMED slot, nulling all preceding slots | ||
spElement(buffer, offset, null); | ||
// If no lookahead has happened then pIndex <= cIndex + 1 and we won't enter the loop | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still going to make the inlining budget for normal polls higher. The common case should be in poll. Exceptional cases in separate methods. |
||
while (++cIndex < pIndex && lpElement(buffer, offset = calcElementOffset(cIndex)) == CONSUMED) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Try to turn this long based loop into an int based one or use batch nulling (Arrays::fill? probably is worse) in 2 steps, by checking array wrapping if needeed. The former should be a better option There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Arrays::fill optimization is only going to be a win if this is a very big gap, which is very unlikely. |
||
{ | ||
spElement(buffer, offset, null); | ||
} | ||
soConsumerIndex(cIndex); | ||
} | ||
|
||
private E pollMissPath(final E[] buffer, final long cOffset, final long cIndex, long pIndex) | ||
{ | ||
while (true) | ||
{ | ||
for (int i = 0; i < SPIN_COUNT; i++) | ||
{ | ||
do | ||
E e = tryRange(buffer, cOffset, cIndex, pIndex); | ||
if (e != null) | ||
{ | ||
e = lvElement(buffer, offset); | ||
seenProducerIndex = pIndex; | ||
return e; | ||
} | ||
while (e == null); | ||
} | ||
else | ||
pIndex = lvProducerIndex(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I would query the producer index SPIN_COUNT times, but just once. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Please elaborate... which bounds checking? :)
It seems like this could be sensible, but feel like it may be best to determine that empirically. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Uuups I have misread parentheses :P |
||
} | ||
} | ||
|
||
private E tryRange(final E[] buffer, final long cOffset, final long cIndex, final long pIndex) { | ||
long candidateIndex = -1L; | ||
E candidate = null; | ||
outer: while (true) | ||
{ | ||
E e = lvElement(buffer, cOffset); | ||
if (e != null) | ||
{ | ||
return null; | ||
incrementConsumerIndex(buffer, cOffset, cIndex, pIndex); | ||
return e; | ||
} | ||
for (long index = cIndex + 1L; index < pIndex; index++) | ||
{ | ||
long offset = calcElementOffset(index); | ||
if (index == candidateIndex) | ||
{ | ||
spElement(buffer, offset, (E) CONSUMED); | ||
return candidate; | ||
} | ||
if (lpElement(buffer, offset) != CONSUMED && (e = lvElement(buffer, offset)) != null) | ||
{ | ||
// must loop back to verify earlier elements are still not visible | ||
candidateIndex = index; | ||
candidate = e; | ||
continue outer; | ||
} | ||
} | ||
return null; | ||
} | ||
|
||
spElement(buffer, offset, null); | ||
soConsumerIndex(cIndex + 1); // StoreStore | ||
return e; | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reverts the streaming behavior of Fast Flow like algorithm to the cached variant of Lamport's algorithm (introduced by @mjpt777 ), it's a shame to lose out on that one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nitsanw I don't follow, hoping you misread the logic... the only thing this changes (I think) is to possibly eliminate some (volatile) reads of the producer index by the consumer, via use of a consumer-local cached value. This simpler optimization could actually be done independently of the other changes here, though it becomes more significant when combined with the look-ahead scanning.
I.e. just changing from:
to