Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement batching methods on MpmcArrayQueue #211

Merged
merged 1 commit into from Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -519,6 +519,7 @@ private static void processSpecialNodeTypes(NodeWithType<?, Type> node, String n
case "mask":
case "offset":
case "seqOffset":
case "lookAheadSeqOffset":
case "lookAheadElementOffset":
node.setType(PrimitiveType.intType());
}
Expand Down
114 changes: 113 additions & 1 deletion jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java
Expand Up @@ -128,10 +128,13 @@ abstract class MpmcArrayQueueL3Pad<E> extends MpmcArrayQueueConsumerIndexField<E
*/
public class MpmcArrayQueue<E> extends MpmcArrayQueueL3Pad<E>
{
public static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.mpmc.max.lookahead.step", 4096);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably use PortableJvmInfo.RECOMENDED_OFFER_BATCH instead. On SPSC there's no reason not to claim large chunks for an uncontended producer, but for MPSC/MPMC the assumption should be some contention. Given that the empty slots in the queue will result in consumer 'bubbles' (queue not empty but slot is empty) we can potentially have an issue if a producer takes a while to fill up the slots it claims.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strike that, this is just setting a max

private final int lookAheadStep;

public MpmcArrayQueue(final int capacity)
{
super(RangeUtil.checkGreaterThanOrEqual(capacity, 2, "capacity"));
lookAheadStep = Math.max(2, Math.min(capacity() / 4, MAX_LOOK_AHEAD_STEP));
}

@Override
Expand Down Expand Up @@ -347,6 +350,57 @@ public int fill(Supplier<E> s)

@Override
public int drain(Consumer<E> c, int limit)
{
final long[] sBuffer = sequenceBuffer;
final long mask = this.mask;
final E[] buffer = this.buffer;
final int maxLookAheadStep = Math.min(this.lookAheadStep, limit);
int consumed = 0;

while (consumed < limit)
{
final int remaining = limit - consumed;
final int lookAheadStep = Math.min(remaining, maxLookAheadStep);
final long cIndex = lvConsumerIndex();
final long lookAheadIndex = cIndex + lookAheadStep - 1;
final long lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask);
final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset);
final long expectedLookAheadSeq = lookAheadIndex + 1;
if (lookAheadSeq == expectedLookAheadSeq && casConsumerIndex(cIndex, expectedLookAheadSeq))
{
for (int i = 0; i < lookAheadStep; i++)
{
final long index = cIndex + i;
final long seqOffset = calcSequenceOffset(index, mask);
final long offset = calcElementOffset(index, mask);
final long expectedSeq = index + 1;
while (lvSequence(sBuffer, seqOffset) != expectedSeq)
{

}
final E e = lpElement(buffer, offset);
soElement(buffer, offset, null);
soSequence(sBuffer, seqOffset, index + mask + 1);
c.accept(e);
}
consumed += lookAheadStep;
}
else
{
if (lookAheadSeq < expectedLookAheadSeq)
{
if (notAvailable(cIndex, mask, sBuffer, cIndex + 1))
{
return consumed;
}
}
return consumed + drainOneByOne(c, remaining);
}
}
return limit;
}

private int drainOneByOne(Consumer<E> c, int limit)
{
final long[] sBuffer = sequenceBuffer;
final long mask = this.mask;
Expand Down Expand Up @@ -383,6 +437,65 @@ public int drain(Consumer<E> c, int limit)

@Override
public int fill(Supplier<E> s, int limit)
{
final long[] sBuffer = sequenceBuffer;
final long mask = this.mask;
final E[] buffer = this.buffer;
final int maxLookAheadStep = Math.min(this.lookAheadStep, limit);
int produced = 0;

while (produced < limit)
{
final int remaining = limit - produced;
final int lookAheadStep = Math.min(remaining, maxLookAheadStep);
final long pIndex = lvProducerIndex();
final long lookAheadIndex = pIndex + lookAheadStep - 1;
final long lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask);
final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset);
final long expectedLookAheadSeq = lookAheadIndex;
if (lookAheadSeq == expectedLookAheadSeq && casProducerIndex(pIndex, expectedLookAheadSeq + 1))
{
for (int i = 0; i < lookAheadStep; i++)
{
final long index = pIndex + i;
final long seqOffset = calcSequenceOffset(index, mask);
final long offset = calcElementOffset(index, mask);
while (lvSequence(sBuffer, seqOffset) != index)
{

}
soElement(buffer, offset, s.get());
soSequence(sBuffer, seqOffset, index + 1);
}
produced += lookAheadStep;
}
else
{
if (lookAheadSeq < expectedLookAheadSeq)
{
if (notAvailable(pIndex, mask, sBuffer, pIndex))
{
return produced;
}
}
return produced + fillOneByOne(s, remaining);
}
}
return limit;
}

private boolean notAvailable(long index, long mask, long[] sBuffer, long expectedSeq)
{
final long seqOffset = calcSequenceOffset(index, mask);
final long seq = lvSequence(sBuffer, seqOffset);
if (seq < expectedSeq)
{
return true;
}
return false;
}

private int fillOneByOne(Supplier<E> s, int limit)
{
final long[] sBuffer = sequenceBuffer;
final long mask = this.mask;
Expand All @@ -405,7 +518,6 @@ public int fill(Supplier<E> s, int limit)
}
while (seq > pIndex || // another producer has moved the sequence
!casProducerIndex(pIndex, pIndex + 1)); // failed to increment

soElement(buffer, calcElementOffset(pIndex, mask), s.get());
soSequence(sBuffer, seqOffset, pIndex + 1);
}
Expand Down
Expand Up @@ -141,8 +141,13 @@ abstract class MpmcAtomicArrayQueueL3Pad<E> extends MpmcAtomicArrayQueueConsumer
*/
public class MpmcAtomicArrayQueue<E> extends MpmcAtomicArrayQueueL3Pad<E> {

public static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.mpmc.max.lookahead.step", 4096);

private final int lookAheadStep;

public MpmcAtomicArrayQueue(final int capacity) {
super(RangeUtil.checkGreaterThanOrEqual(capacity, 2, "capacity"));
lookAheadStep = Math.max(2, Math.min(capacity() / 4, MAX_LOOK_AHEAD_STEP));
}

@Override
Expand Down Expand Up @@ -329,6 +334,46 @@ public int fill(Supplier<E> s) {

@Override
public int drain(Consumer<E> c, int limit) {
final AtomicLongArray sBuffer = sequenceBuffer;
final int mask = this.mask;
final AtomicReferenceArray<E> buffer = this.buffer;
final int maxLookAheadStep = Math.min(this.lookAheadStep, limit);
int consumed = 0;
while (consumed < limit) {
final int remaining = limit - consumed;
final int lookAheadStep = Math.min(remaining, maxLookAheadStep);
final long cIndex = lvConsumerIndex();
final long lookAheadIndex = cIndex + lookAheadStep - 1;
final int lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask);
final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset);
final long expectedLookAheadSeq = lookAheadIndex + 1;
if (lookAheadSeq == expectedLookAheadSeq && casConsumerIndex(cIndex, expectedLookAheadSeq)) {
for (int i = 0; i < lookAheadStep; i++) {
final long index = cIndex + i;
final int seqOffset = calcSequenceOffset(index, mask);
final int offset = calcElementOffset(index, mask);
final long expectedSeq = index + 1;
while (lvSequence(sBuffer, seqOffset) != expectedSeq) {
}
final E e = lpElement(buffer, offset);
soElement(buffer, offset, null);
soSequence(sBuffer, seqOffset, index + mask + 1);
c.accept(e);
}
consumed += lookAheadStep;
} else {
if (lookAheadSeq < expectedLookAheadSeq) {
if (notAvailableYet(cIndex, mask, sBuffer, cIndex + 1)) {
return consumed;
}
}
return consumed + drainOneByOne(c, remaining);
}
}
return limit;
}

private int drainOneByOne(Consumer<E> c, int limit) {
final AtomicLongArray sBuffer = sequenceBuffer;
final int mask = this.mask;
final AtomicReferenceArray<E> buffer = this.buffer;
Expand Down Expand Up @@ -359,6 +404,52 @@ public int drain(Consumer<E> c, int limit) {

@Override
public int fill(Supplier<E> s, int limit) {
final AtomicLongArray sBuffer = sequenceBuffer;
final int mask = this.mask;
final AtomicReferenceArray<E> buffer = this.buffer;
final int maxLookAheadStep = Math.min(this.lookAheadStep, limit);
int produced = 0;
while (produced < limit) {
final int remaining = limit - produced;
final int lookAheadStep = Math.min(remaining, maxLookAheadStep);
final long pIndex = lvProducerIndex();
final long lookAheadIndex = pIndex + lookAheadStep - 1;
final int lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask);
final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset);
final long expectedLookAheadSeq = lookAheadIndex;
if (lookAheadSeq == expectedLookAheadSeq && casProducerIndex(pIndex, expectedLookAheadSeq + 1)) {
for (int i = 0; i < lookAheadStep; i++) {
final long index = pIndex + i;
final int seqOffset = calcSequenceOffset(index, mask);
final int offset = calcElementOffset(index, mask);
while (lvSequence(sBuffer, seqOffset) != index) {
}
soElement(buffer, offset, s.get());
soSequence(sBuffer, seqOffset, index + 1);
}
produced += lookAheadStep;
} else {
if (lookAheadSeq < expectedLookAheadSeq) {
if (notAvailableYet(pIndex, mask, sBuffer, pIndex)) {
return produced;
}
}
return produced + fillOneByOne(s, remaining);
}
}
return limit;
}

private boolean notAvailableYet(long index, int mask, AtomicLongArray sBuffer, long expectedSeq) {
final int seqOffset = calcSequenceOffset(index, mask);
final long seq = lvSequence(sBuffer, seqOffset);
if (seq < expectedSeq) {
return true;
}
return false;
}

private int fillOneByOne(Supplier<E> s, int limit) {
final AtomicLongArray sBuffer = sequenceBuffer;
final int mask = this.mask;
final AtomicReferenceArray<E> buffer = this.buffer;
Expand Down