Skip to content

Commit

Permalink
Implement batching methods on MpmcArrayQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed May 25, 2018
1 parent 72c55e5 commit efcb607
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 1 deletion.
Expand Up @@ -519,6 +519,7 @@ private static void processSpecialNodeTypes(NodeWithType<?, Type> node, String n
case "mask":
case "offset":
case "seqOffset":
case "lookAheadSeqOffset":
case "lookAheadElementOffset":
node.setType(PrimitiveType.intType());
}
Expand Down
114 changes: 113 additions & 1 deletion jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java
Expand Up @@ -128,10 +128,13 @@ abstract class MpmcArrayQueueL3Pad<E> extends MpmcArrayQueueConsumerIndexField<E
*/
public class MpmcArrayQueue<E> extends MpmcArrayQueueL3Pad<E>
{
public static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.mpmc.max.lookahead.step", 4096);
private final int lookAheadStep;

public MpmcArrayQueue(final int capacity)
{
super(RangeUtil.checkGreaterThanOrEqual(capacity, 2, "capacity"));
lookAheadStep = Math.max(2, Math.min(capacity() / 4, MAX_LOOK_AHEAD_STEP));
}

@Override
Expand Down Expand Up @@ -347,6 +350,57 @@ public int fill(Supplier<E> s)

@Override
public int drain(Consumer<E> c, int limit)
{
final long[] sBuffer = sequenceBuffer;
final long mask = this.mask;
final E[] buffer = this.buffer;
final int maxLookAheadStep = Math.min(this.lookAheadStep, limit);
int consumed = 0;

while (consumed < limit)
{
final int remaining = limit - consumed;
final int lookAheadStep = Math.min(remaining, maxLookAheadStep);
final long cIndex = lvConsumerIndex();
final long lookAheadIndex = cIndex + lookAheadStep - 1;
final long lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask);
final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset);
final long expectedLookAheadSeq = lookAheadIndex + 1;
if (lookAheadSeq == expectedLookAheadSeq && casConsumerIndex(cIndex, expectedLookAheadSeq))
{
for (int i = 0; i < lookAheadStep; i++)
{
final long index = cIndex + i;
final long seqOffset = calcSequenceOffset(index, mask);
final long offset = calcElementOffset(index, mask);
final long expectedSeq = index + 1;
while (lvSequence(sBuffer, seqOffset) != expectedSeq)
{

}
final E e = lpElement(buffer, offset);
soElement(buffer, offset, null);
soSequence(sBuffer, seqOffset, index + mask + 1);
c.accept(e);
}
consumed += lookAheadStep;
}
else
{
if (lookAheadSeq < expectedLookAheadSeq)
{
if (notAvailable(cIndex, mask, sBuffer, cIndex + 1))
{
return consumed;
}
}
return consumed + drainOneByOne(c, remaining);
}
}
return limit;
}

private int drainOneByOne(Consumer<E> c, int limit)
{
final long[] sBuffer = sequenceBuffer;
final long mask = this.mask;
Expand Down Expand Up @@ -383,6 +437,65 @@ public int drain(Consumer<E> c, int limit)

@Override
public int fill(Supplier<E> s, int limit)
{
final long[] sBuffer = sequenceBuffer;
final long mask = this.mask;
final E[] buffer = this.buffer;
final int maxLookAheadStep = Math.min(this.lookAheadStep, limit);
int produced = 0;

while (produced < limit)
{
final int remaining = limit - produced;
final int lookAheadStep = Math.min(remaining, maxLookAheadStep);
final long pIndex = lvProducerIndex();
final long lookAheadIndex = pIndex + lookAheadStep - 1;
final long lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask);
final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset);
final long expectedLookAheadSeq = lookAheadIndex;
if (lookAheadSeq == expectedLookAheadSeq && casProducerIndex(pIndex, expectedLookAheadSeq + 1))
{
for (int i = 0; i < lookAheadStep; i++)
{
final long index = pIndex + i;
final long seqOffset = calcSequenceOffset(index, mask);
final long offset = calcElementOffset(index, mask);
while (lvSequence(sBuffer, seqOffset) != index)
{

}
soElement(buffer, offset, s.get());
soSequence(sBuffer, seqOffset, index + 1);
}
produced += lookAheadStep;
}
else
{
if (lookAheadSeq < expectedLookAheadSeq)
{
if (notAvailable(pIndex, mask, sBuffer, pIndex))
{
return produced;
}
}
return produced + fillOneByOne(s, remaining);
}
}
return limit;
}

private boolean notAvailable(long index, long mask, long[] sBuffer, long expectedSeq)
{
final long seqOffset = calcSequenceOffset(index, mask);
final long seq = lvSequence(sBuffer, seqOffset);
if (seq < expectedSeq)
{
return true;
}
return false;
}

private int fillOneByOne(Supplier<E> s, int limit)
{
final long[] sBuffer = sequenceBuffer;
final long mask = this.mask;
Expand All @@ -405,7 +518,6 @@ public int fill(Supplier<E> s, int limit)
}
while (seq > pIndex || // another producer has moved the sequence
!casProducerIndex(pIndex, pIndex + 1)); // failed to increment

soElement(buffer, calcElementOffset(pIndex, mask), s.get());
soSequence(sBuffer, seqOffset, pIndex + 1);
}
Expand Down
Expand Up @@ -141,8 +141,13 @@ abstract class MpmcAtomicArrayQueueL3Pad<E> extends MpmcAtomicArrayQueueConsumer
*/
public class MpmcAtomicArrayQueue<E> extends MpmcAtomicArrayQueueL3Pad<E> {

public static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.mpmc.max.lookahead.step", 4096);

private final int lookAheadStep;

public MpmcAtomicArrayQueue(final int capacity) {
super(RangeUtil.checkGreaterThanOrEqual(capacity, 2, "capacity"));
lookAheadStep = Math.max(2, Math.min(capacity() / 4, MAX_LOOK_AHEAD_STEP));
}

@Override
Expand Down Expand Up @@ -329,6 +334,46 @@ public int fill(Supplier<E> s) {

@Override
public int drain(Consumer<E> c, int limit) {
final AtomicLongArray sBuffer = sequenceBuffer;
final int mask = this.mask;
final AtomicReferenceArray<E> buffer = this.buffer;
final int maxLookAheadStep = Math.min(this.lookAheadStep, limit);
int consumed = 0;
while (consumed < limit) {
final int remaining = limit - consumed;
final int lookAheadStep = Math.min(remaining, maxLookAheadStep);
final long cIndex = lvConsumerIndex();
final long lookAheadIndex = cIndex + lookAheadStep - 1;
final int lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask);
final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset);
final long expectedLookAheadSeq = lookAheadIndex + 1;
if (lookAheadSeq == expectedLookAheadSeq && casConsumerIndex(cIndex, expectedLookAheadSeq)) {
for (int i = 0; i < lookAheadStep; i++) {
final long index = cIndex + i;
final int seqOffset = calcSequenceOffset(index, mask);
final int offset = calcElementOffset(index, mask);
final long expectedSeq = index + 1;
while (lvSequence(sBuffer, seqOffset) != expectedSeq) {
}
final E e = lpElement(buffer, offset);
soElement(buffer, offset, null);
soSequence(sBuffer, seqOffset, index + mask + 1);
c.accept(e);
}
consumed += lookAheadStep;
} else {
if (lookAheadSeq < expectedLookAheadSeq) {
if (notAvailableYet(cIndex, mask, sBuffer, cIndex + 1)) {
return consumed;
}
}
return consumed + drainOneByOne(c, remaining);
}
}
return limit;
}

private int drainOneByOne(Consumer<E> c, int limit) {
final AtomicLongArray sBuffer = sequenceBuffer;
final int mask = this.mask;
final AtomicReferenceArray<E> buffer = this.buffer;
Expand Down Expand Up @@ -359,6 +404,52 @@ public int drain(Consumer<E> c, int limit) {

@Override
public int fill(Supplier<E> s, int limit) {
final AtomicLongArray sBuffer = sequenceBuffer;
final int mask = this.mask;
final AtomicReferenceArray<E> buffer = this.buffer;
final int maxLookAheadStep = Math.min(this.lookAheadStep, limit);
int produced = 0;
while (produced < limit) {
final int remaining = limit - produced;
final int lookAheadStep = Math.min(remaining, maxLookAheadStep);
final long pIndex = lvProducerIndex();
final long lookAheadIndex = pIndex + lookAheadStep - 1;
final int lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask);
final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset);
final long expectedLookAheadSeq = lookAheadIndex;
if (lookAheadSeq == expectedLookAheadSeq && casProducerIndex(pIndex, expectedLookAheadSeq + 1)) {
for (int i = 0; i < lookAheadStep; i++) {
final long index = pIndex + i;
final int seqOffset = calcSequenceOffset(index, mask);
final int offset = calcElementOffset(index, mask);
while (lvSequence(sBuffer, seqOffset) != index) {
}
soElement(buffer, offset, s.get());
soSequence(sBuffer, seqOffset, index + 1);
}
produced += lookAheadStep;
} else {
if (lookAheadSeq < expectedLookAheadSeq) {
if (notAvailableYet(pIndex, mask, sBuffer, pIndex)) {
return produced;
}
}
return produced + fillOneByOne(s, remaining);
}
}
return limit;
}

private boolean notAvailableYet(long index, int mask, AtomicLongArray sBuffer, long expectedSeq) {
final int seqOffset = calcSequenceOffset(index, mask);
final long seq = lvSequence(sBuffer, seqOffset);
if (seq < expectedSeq) {
return true;
}
return false;
}

private int fillOneByOne(Supplier<E> s, int limit) {
final AtomicLongArray sBuffer = sequenceBuffer;
final int mask = this.mask;
final AtomicReferenceArray<E> buffer = this.buffer;
Expand Down

0 comments on commit efcb607

Please sign in to comment.