From efcb60756ad75ef70e8e9ae603cf7c65cd46423d Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Fri, 25 May 2018 11:15:26 +0200 Subject: [PATCH] Implement batching methods on MpmcArrayQueue --- .../JavaParsingAtomicArrayQueueGenerator.java | 1 + .../org/jctools/queues/MpmcArrayQueue.java | 114 +++++++++++++++++- .../queues/atomic/MpmcAtomicArrayQueue.java | 91 ++++++++++++++ 3 files changed, 205 insertions(+), 1 deletion(-) diff --git a/jctools-build/src/main/java/org/jctools/queues/atomic/JavaParsingAtomicArrayQueueGenerator.java b/jctools-build/src/main/java/org/jctools/queues/atomic/JavaParsingAtomicArrayQueueGenerator.java index 3708b8a6..4723db4a 100644 --- a/jctools-build/src/main/java/org/jctools/queues/atomic/JavaParsingAtomicArrayQueueGenerator.java +++ b/jctools-build/src/main/java/org/jctools/queues/atomic/JavaParsingAtomicArrayQueueGenerator.java @@ -519,6 +519,7 @@ private static void processSpecialNodeTypes(NodeWithType node, String n case "mask": case "offset": case "seqOffset": + case "lookAheadSeqOffset": case "lookAheadElementOffset": node.setType(PrimitiveType.intType()); } diff --git a/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java index 31508a40..f1bcf6ef 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java @@ -128,10 +128,13 @@ abstract class MpmcArrayQueueL3Pad extends MpmcArrayQueueConsumerIndexField extends MpmcArrayQueueL3Pad { + public static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.mpmc.max.lookahead.step", 4096); + private final int lookAheadStep; public MpmcArrayQueue(final int capacity) { super(RangeUtil.checkGreaterThanOrEqual(capacity, 2, "capacity")); + lookAheadStep = Math.max(2, Math.min(capacity() / 4, MAX_LOOK_AHEAD_STEP)); } @Override @@ -347,6 +350,57 @@ public int fill(Supplier s) @Override public int drain(Consumer c, int limit) + { + final long[] sBuffer = sequenceBuffer; + final long mask = this.mask; + final E[] buffer = this.buffer; + final int maxLookAheadStep = Math.min(this.lookAheadStep, limit); + int consumed = 0; + + while (consumed < limit) + { + final int remaining = limit - consumed; + final int lookAheadStep = Math.min(remaining, maxLookAheadStep); + final long cIndex = lvConsumerIndex(); + final long lookAheadIndex = cIndex + lookAheadStep - 1; + final long lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask); + final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset); + final long expectedLookAheadSeq = lookAheadIndex + 1; + if (lookAheadSeq == expectedLookAheadSeq && casConsumerIndex(cIndex, expectedLookAheadSeq)) + { + for (int i = 0; i < lookAheadStep; i++) + { + final long index = cIndex + i; + final long seqOffset = calcSequenceOffset(index, mask); + final long offset = calcElementOffset(index, mask); + final long expectedSeq = index + 1; + while (lvSequence(sBuffer, seqOffset) != expectedSeq) + { + + } + final E e = lpElement(buffer, offset); + soElement(buffer, offset, null); + soSequence(sBuffer, seqOffset, index + mask + 1); + c.accept(e); + } + consumed += lookAheadStep; + } + else + { + if (lookAheadSeq < expectedLookAheadSeq) + { + if (notAvailable(cIndex, mask, sBuffer, cIndex + 1)) + { + return consumed; + } + } + return consumed + drainOneByOne(c, remaining); + } + } + return limit; + } + + private int drainOneByOne(Consumer c, int limit) { final long[] sBuffer = sequenceBuffer; final long mask = this.mask; @@ -383,6 +437,65 @@ public int drain(Consumer c, int limit) @Override public int fill(Supplier s, int limit) + { + final long[] sBuffer = sequenceBuffer; + final long mask = this.mask; + final E[] buffer = this.buffer; + final int maxLookAheadStep = Math.min(this.lookAheadStep, limit); + int produced = 0; + + while (produced < limit) + { + final int remaining = limit - produced; + final int lookAheadStep = Math.min(remaining, maxLookAheadStep); + final long pIndex = lvProducerIndex(); + final long lookAheadIndex = pIndex + lookAheadStep - 1; + final long lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask); + final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset); + final long expectedLookAheadSeq = lookAheadIndex; + if (lookAheadSeq == expectedLookAheadSeq && casProducerIndex(pIndex, expectedLookAheadSeq + 1)) + { + for (int i = 0; i < lookAheadStep; i++) + { + final long index = pIndex + i; + final long seqOffset = calcSequenceOffset(index, mask); + final long offset = calcElementOffset(index, mask); + while (lvSequence(sBuffer, seqOffset) != index) + { + + } + soElement(buffer, offset, s.get()); + soSequence(sBuffer, seqOffset, index + 1); + } + produced += lookAheadStep; + } + else + { + if (lookAheadSeq < expectedLookAheadSeq) + { + if (notAvailable(pIndex, mask, sBuffer, pIndex)) + { + return produced; + } + } + return produced + fillOneByOne(s, remaining); + } + } + return limit; + } + + private boolean notAvailable(long index, long mask, long[] sBuffer, long expectedSeq) + { + final long seqOffset = calcSequenceOffset(index, mask); + final long seq = lvSequence(sBuffer, seqOffset); + if (seq < expectedSeq) + { + return true; + } + return false; + } + + private int fillOneByOne(Supplier s, int limit) { final long[] sBuffer = sequenceBuffer; final long mask = this.mask; @@ -405,7 +518,6 @@ public int fill(Supplier s, int limit) } while (seq > pIndex || // another producer has moved the sequence !casProducerIndex(pIndex, pIndex + 1)); // failed to increment - soElement(buffer, calcElementOffset(pIndex, mask), s.get()); soSequence(sBuffer, seqOffset, pIndex + 1); } diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/MpmcAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/MpmcAtomicArrayQueue.java index 9bbe1ae0..2fee91bd 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/MpmcAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/MpmcAtomicArrayQueue.java @@ -141,8 +141,13 @@ abstract class MpmcAtomicArrayQueueL3Pad extends MpmcAtomicArrayQueueConsumer */ public class MpmcAtomicArrayQueue extends MpmcAtomicArrayQueueL3Pad { + public static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.mpmc.max.lookahead.step", 4096); + + private final int lookAheadStep; + public MpmcAtomicArrayQueue(final int capacity) { super(RangeUtil.checkGreaterThanOrEqual(capacity, 2, "capacity")); + lookAheadStep = Math.max(2, Math.min(capacity() / 4, MAX_LOOK_AHEAD_STEP)); } @Override @@ -329,6 +334,46 @@ public int fill(Supplier s) { @Override public int drain(Consumer c, int limit) { + final AtomicLongArray sBuffer = sequenceBuffer; + final int mask = this.mask; + final AtomicReferenceArray buffer = this.buffer; + final int maxLookAheadStep = Math.min(this.lookAheadStep, limit); + int consumed = 0; + while (consumed < limit) { + final int remaining = limit - consumed; + final int lookAheadStep = Math.min(remaining, maxLookAheadStep); + final long cIndex = lvConsumerIndex(); + final long lookAheadIndex = cIndex + lookAheadStep - 1; + final int lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask); + final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset); + final long expectedLookAheadSeq = lookAheadIndex + 1; + if (lookAheadSeq == expectedLookAheadSeq && casConsumerIndex(cIndex, expectedLookAheadSeq)) { + for (int i = 0; i < lookAheadStep; i++) { + final long index = cIndex + i; + final int seqOffset = calcSequenceOffset(index, mask); + final int offset = calcElementOffset(index, mask); + final long expectedSeq = index + 1; + while (lvSequence(sBuffer, seqOffset) != expectedSeq) { + } + final E e = lpElement(buffer, offset); + soElement(buffer, offset, null); + soSequence(sBuffer, seqOffset, index + mask + 1); + c.accept(e); + } + consumed += lookAheadStep; + } else { + if (lookAheadSeq < expectedLookAheadSeq) { + if (notAvailableYet(cIndex, mask, sBuffer, cIndex + 1)) { + return consumed; + } + } + return consumed + drainOneByOne(c, remaining); + } + } + return limit; + } + + private int drainOneByOne(Consumer c, int limit) { final AtomicLongArray sBuffer = sequenceBuffer; final int mask = this.mask; final AtomicReferenceArray buffer = this.buffer; @@ -359,6 +404,52 @@ public int drain(Consumer c, int limit) { @Override public int fill(Supplier s, int limit) { + final AtomicLongArray sBuffer = sequenceBuffer; + final int mask = this.mask; + final AtomicReferenceArray buffer = this.buffer; + final int maxLookAheadStep = Math.min(this.lookAheadStep, limit); + int produced = 0; + while (produced < limit) { + final int remaining = limit - produced; + final int lookAheadStep = Math.min(remaining, maxLookAheadStep); + final long pIndex = lvProducerIndex(); + final long lookAheadIndex = pIndex + lookAheadStep - 1; + final int lookAheadSeqOffset = calcSequenceOffset(lookAheadIndex, mask); + final long lookAheadSeq = lvSequence(sBuffer, lookAheadSeqOffset); + final long expectedLookAheadSeq = lookAheadIndex; + if (lookAheadSeq == expectedLookAheadSeq && casProducerIndex(pIndex, expectedLookAheadSeq + 1)) { + for (int i = 0; i < lookAheadStep; i++) { + final long index = pIndex + i; + final int seqOffset = calcSequenceOffset(index, mask); + final int offset = calcElementOffset(index, mask); + while (lvSequence(sBuffer, seqOffset) != index) { + } + soElement(buffer, offset, s.get()); + soSequence(sBuffer, seqOffset, index + 1); + } + produced += lookAheadStep; + } else { + if (lookAheadSeq < expectedLookAheadSeq) { + if (notAvailableYet(pIndex, mask, sBuffer, pIndex)) { + return produced; + } + } + return produced + fillOneByOne(s, remaining); + } + } + return limit; + } + + private boolean notAvailableYet(long index, int mask, AtomicLongArray sBuffer, long expectedSeq) { + final int seqOffset = calcSequenceOffset(index, mask); + final long seq = lvSequence(sBuffer, seqOffset); + if (seq < expectedSeq) { + return true; + } + return false; + } + + private int fillOneByOne(Supplier s, int limit) { final AtomicLongArray sBuffer = sequenceBuffer; final int mask = this.mask; final AtomicReferenceArray buffer = this.buffer;