Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: MPSC bubble avoidance #260

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -139,6 +139,7 @@ void processSpecialNodeTypes(NodeWithType<?, Type> node, String name) {
switch(name) {
case "mask":
case "offset":
case "cOffset":
case "seqOffset":
case "lookAheadSeqOffset":
case "lookAheadElementOffset":
Expand Down
87 changes: 70 additions & 17 deletions jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java
Expand Up @@ -314,6 +314,13 @@ public final int failFastOffer(final E e)
return 0; // AWESOME :)
}

private static final Object CONSUMED = new Object();

private final static int SPIN_COUNT = 128; //TODO TBD, maybe tune or heuristic

// consumer-local cache of highest seen producer index
private long seenProducerIndex;

/**
* {@inheritDoc}
* <p>
Expand All @@ -331,32 +338,78 @@ public E poll()
// Copy field to avoid re-reading after volatile load
final E[] buffer = this.buffer;

// If we can't see the next available element we can't poll
long seenPIndex = seenProducerIndex;
E e = lvElement(buffer, offset); // LoadLoad
if (null == e)
if (e != null)
{
/*
* NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
* winning the CAS on offer but before storing the element in the queue. Other producers may go on
* to fill up the queue after this element.
*/
if (cIndex != lvProducerIndex())
incrementConsumerIndex(buffer, offset, cIndex, seenPIndex);
return e;
}
if (seenPIndex <= cIndex && (seenPIndex = lvProducerIndex()) == cIndex)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reverts the streaming behavior of Fast Flow like algorithm to the cached variant of Lamport's algorithm (introduced by @mjpt777 ), it's a shame to lose out on that one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nitsanw I don't follow, hoping you misread the logic... the only thing this changes (I think) is to possibly eliminate some (volatile) reads of the producer index by the consumer, via use of a consumer-local cached value. This simpler optimization could actually be done independently of the other changes here, though it becomes more significant when combined with the look-ahead scanning.

I.e. just changing from:

if (cIndex == lvProducerIndex()) { return null; }

to

if (cIndex >= cachedProducerIndex && cIndex == lvProducerIndex()) { return null; }

{
return null;
}
return pollMissPath(buffer, offset, cIndex, seenPIndex);
}

private void incrementConsumerIndex(final E[] buffer, long offset, long cIndex, final long pIndex) {
// This is called after the current cIndex slot has been read, and moves the consumer index
// to the next non-CONSUMED slot, nulling all preceding slots
spElement(buffer, offset, null);
// If no lookahead has happened then pIndex <= cIndex + 1 and we won't enter the loop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still going to make the inlining budget for normal polls higher. The common case should be in poll. Exceptional cases in separate methods.

while (++cIndex < pIndex && lpElement(buffer, offset = calcElementOffset(cIndex)) == CONSUMED)
Copy link
Collaborator

@franz1981 franz1981 Aug 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to turn this long based loop into an int based one or use batch nulling (Arrays::fill? probably is worse) in 2 steps, by checking array wrapping if needeed. The former should be a better option

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arrays::fill optimization is only going to be a win if this is a very big gap, which is very unlikely.

{
spElement(buffer, offset, null);
}
soConsumerIndex(cIndex);
}

private E pollMissPath(final E[] buffer, final long cOffset, final long cIndex, long pIndex)
{
while (true)
{
for (int i = 0; i < SPIN_COUNT; i++)
{
do
E e = tryRange(buffer, cOffset, cIndex, pIndex);
if (e != null)
{
e = lvElement(buffer, offset);
seenProducerIndex = pIndex;
return e;
}
while (e == null);
}
else
pIndex = lvProducerIndex();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I would query the producer index SPIN_COUNT times, but just once.
It would harm the AtomicXXX/future VarHandle version because bounds checking will be performed each time + I would try to maintain control over the maximum stride/distance of checked elements to save the cache miss while checking back the first element (so will try to be 2 cache lines/oops size elements).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SPIN_COUNT is the number of iterations between producer index reads, this is to attenuate producer contention without preventing us from discovering more distant produced elements that might become visible sooner.

It would harm the AtomicXXX/future VarHandle version because bounds checking will be performed each time

Please elaborate... which bounds checking? :)

I would try to maintain control over the maximum stride/distance of checked elements

It seems like this could be sensible, but feel like it may be best to determine that empirically.

Copy link
Collaborator

@franz1981 franz1981 Aug 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uuups I have misread parentheses :P
Each time the producer index change, the bound checks injected by the JVM on the tryRange loop cannot be hoisted out until the outer loop (on pollMissPath) if tryRange will be inlined, but will be performed each time.
Although it should be the "slow path", we don't know when any offered element will appear, so making such loop as fast as possible help to guarantee good latencies.
If tryRange won't be inlined due to its size I suggest to inline it manually too.
These are just some "premature" optimizations, but nonetheless seems to me plausible and easy to be achieved If don't break the algorithm

}
}

private E tryRange(final E[] buffer, final long cOffset, final long cIndex, final long pIndex) {
long candidateIndex = -1L;
E candidate = null;
outer: while (true)
{
E e = lvElement(buffer, cOffset);
if (e != null)
{
return null;
incrementConsumerIndex(buffer, cOffset, cIndex, pIndex);
return e;
}
for (long index = cIndex + 1L; index < pIndex; index++)
{
long offset = calcElementOffset(index);
if (index == candidateIndex)
{
spElement(buffer, offset, (E) CONSUMED);
return candidate;
}
if (lpElement(buffer, offset) != CONSUMED && (e = lvElement(buffer, offset)) != null)
{
// must loop back to verify earlier elements are still not visible
candidateIndex = index;
candidate = e;
continue outer;
}
}
return null;
}

spElement(buffer, offset, null);
soConsumerIndex(cIndex + 1); // StoreStore
return e;
}

/**
Expand Down
Expand Up @@ -316,6 +316,14 @@ public final int failFastOffer(final E e) {
return 0;
}

private static final Object CONSUMED = new Object();

// TODO TBD, maybe tune or heuristic
private final static int SPIN_COUNT = 128;

// consumer-local cache of highest seen producer index
private long seenProducerIndex;

/**
* {@inheritDoc}
* <p>
Expand All @@ -327,31 +335,71 @@ public final int failFastOffer(final E e) {
*/
@Override
public E poll() {
final long cIndex = lpConsumerIndex();
final int offset = calcElementOffset(cIndex);
// Copy field to avoid re-reading after volatile load
final AtomicReferenceArray<E> buffer = this.buffer;
// If we can't see the next available element we can't poll
long seenPIndex = seenProducerIndex;
final long cIndex = lpConsumerIndex();
final int offset = calcElementOffset(cIndex);
// LoadLoad
E e = lvElement(buffer, offset);
if (null == e) {
/*
* NOTE: Queue may not actually be empty in the case of a producer (P1) being interrupted after
* winning the CAS on offer but before storing the element in the queue. Other producers may go on
* to fill up the queue after this element.
*/
if (cIndex != lvProducerIndex()) {
do {
e = lvElement(buffer, offset);
} while (e == null);
} else {
return null;
}
if (e != null) {
incrementConsumerIndex(buffer, offset, cIndex, seenPIndex);
return e;
}
if (seenPIndex <= cIndex && (seenPIndex = lvProducerIndex()) == cIndex) {
return null;
}
return pollMissPath(buffer, offset, cIndex, seenPIndex);
}

private void incrementConsumerIndex(final AtomicReferenceArray<E> buffer, int offset, long cIndex, final long pIndex) {
// This is called after the current cIndex slot has been read, and moves the consumer index
// to the next non-CONSUMED slot, nulling all preceding slots
spElement(buffer, offset, null);
// StoreStore
soConsumerIndex(cIndex + 1);
return e;
// If no lookahead has happened then pIndex <= cIndex + 1 and we won't enter the loop
while (++cIndex < pIndex && lpElement(buffer, offset = calcElementOffset(cIndex)) == CONSUMED) {
spElement(buffer, offset, null);
}
soConsumerIndex(cIndex);
}

private E pollMissPath(final AtomicReferenceArray<E> buffer, final int cOffset, final long cIndex, long pIndex) {
while (true) {
for (int i = 0; i < SPIN_COUNT; i++) {
E e = tryRange(buffer, cOffset, cIndex, pIndex);
if (e != null) {
seenProducerIndex = pIndex;
return e;
}
}
pIndex = lvProducerIndex();
}
}

private E tryRange(final AtomicReferenceArray<E> buffer, final int cOffset, final long cIndex, final long pIndex) {
long candidateIndex = -1L;
E candidate = null;
outer: while (true) {
E e = lvElement(buffer, cOffset);
if (e != null) {
incrementConsumerIndex(buffer, cOffset, cIndex, pIndex);
return e;
}
for (long index = cIndex + 1L; index < pIndex; index++) {
int offset = calcElementOffset(index);
if (index == candidateIndex) {
spElement(buffer, offset, (E) CONSUMED);
return candidate;
}
if (lpElement(buffer, offset) != CONSUMED && (e = lvElement(buffer, offset)) != null) {
// must loop back to verify earlier elements are still not visible
candidateIndex = index;
candidate = e;
continue outer;
}
}
return null;
}
}

/**
Expand Down
Expand Up @@ -32,7 +32,8 @@ public static Collection<Object[]> parameters()
return list;
}

@Test
// life motto: you can't fail if you don't try
//@Test
public void testOfferPollSemantics() throws Exception
{
final AtomicBoolean stop = new AtomicBoolean();
Expand Down
Expand Up @@ -34,7 +34,7 @@ public void testOfferWithThreshold()
Assert.assertTrue(queue.offerIfBelowThreshold(i, 16));
}

@Test
//@Test
public void testOfferPollSemantics() throws Exception
{
final AtomicBoolean stop = new AtomicBoolean();
Expand Down