diff --git a/jctools-core/src/main/java/org/jctools/queues/MpscProgressiveChunkedQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpscProgressiveChunkedQueue.java new file mode 100644 index 00000000..6515aa45 --- /dev/null +++ b/jctools-core/src/main/java/org/jctools/queues/MpscProgressiveChunkedQueue.java @@ -0,0 +1,705 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jctools.queues; + +import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue; +import org.jctools.util.PortableJvmInfo; +import org.jctools.util.Pow2; +import org.jctools.util.UnsafeRefArrayAccess; + +import java.util.AbstractQueue; +import java.util.Iterator; + +import static org.jctools.util.UnsafeAccess.UNSAFE; +import static org.jctools.util.UnsafeAccess.fieldOffset; + + +abstract class MpscProgressiveChunkedQueuePad1 extends AbstractQueue implements IndexedQueue +{ + long p01, p02, p03, p04, p05, p06, p07; + long p10, p11, p12, p13, p14, p15, p16, p17; + + static final class AtomicChunk + { + private static final long PREV_OFFSET = fieldOffset(AtomicChunk.class, "prev"); + private static final long NEXT_OFFSET = fieldOffset(AtomicChunk.class, "next"); + private static final long INDEX_OFFSET = fieldOffset(AtomicChunk.class, "index"); + private volatile AtomicChunk prev; + private volatile long index; + private volatile AtomicChunk next; + private final E[] buffer; + + AtomicChunk(long index, AtomicChunk prev, int size) + { + buffer = CircularArrayOffsetCalculator.allocate(size); + spIndex(index); + spNext(null); + soPrev(prev); + } + + public void init(long index, AtomicChunk prev) + { + soIndex(index); + soPrev(prev); + } + + public AtomicChunk lvNext() + { + return next; + } + + public AtomicChunk lvPrev() + { + return prev; + } + + public long lvIndex() + { + return UNSAFE.getLongVolatile(this, INDEX_OFFSET); + } + + public void soIndex(long index) + { + UNSAFE.putOrderedLong(this, INDEX_OFFSET, index); + } + + public void spIndex(long index) + { + UNSAFE.putLong(this, INDEX_OFFSET, index); + } + + public void soNext(AtomicChunk value) + { + UNSAFE.putOrderedObject(this, NEXT_OFFSET, value); + } + + public void spNext(AtomicChunk value) + { + UNSAFE.putObject(this, NEXT_OFFSET, value); + } + + public void soPrev(AtomicChunk value) + { + UNSAFE.putOrderedObject(this, PREV_OFFSET, value); + } + + public void spPrev(AtomicChunk value) + { + UNSAFE.putObject(this, PREV_OFFSET, value); + } + + public void soElement(int index, E e) + { + UnsafeRefArrayAccess.soElement(buffer, UnsafeRefArrayAccess.calcElementOffset(index), e); + } + + public void spElement(int index, E e) + { + UnsafeRefArrayAccess.spElement(buffer, UnsafeRefArrayAccess.calcElementOffset(index), e); + } + + public E lvElement(int index) + { + return UnsafeRefArrayAccess.lvElement(buffer, UnsafeRefArrayAccess.calcElementOffset(index)); + } + + } +} + +// $gen:ordered-fields +abstract class MpscProgressiveChunkedQueueProducerFields extends MpscProgressiveChunkedQueuePad1 +{ + private final static long P_INDEX_OFFSET = + fieldOffset(MpscProgressiveChunkedQueueProducerFields.class, "producerIndex"); + private long producerIndex; + + @Override + public final long lvProducerIndex() + { + return producerIndex; + } + + final long getAndIncrementProducerIndex() + { + return UNSAFE.getAndAddLong(this, P_INDEX_OFFSET, 1); + } + + final long getAndAddProducerIndex(long delta) + { + return UNSAFE.getAndAddLong(this, P_INDEX_OFFSET, delta); + } +} + +abstract class MpscProgressiveChunkedQueuePad2 extends MpscProgressiveChunkedQueueProducerFields +{ + long p01, p02, p03, p04, p05, p06, p07, p08; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +// $gen:ordered-fields +abstract class MpscProgressiveChunkedQueueProducerBuffer extends MpscProgressiveChunkedQueuePad2 +{ + private static final long P_BUFFER_OFFSET = + fieldOffset(MpscProgressiveChunkedQueueProducerBuffer.class, "producerBuffer"); + + private volatile AtomicChunk producerBuffer; + + final AtomicChunk lvProducerBuffer() + { + return this.producerBuffer; + } + + final void soProducerBuffer(AtomicChunk buffer) + { + UNSAFE.putOrderedObject(this, P_BUFFER_OFFSET, buffer); + } +} + +abstract class MpscProgressiveChunkedQueuePad3 extends MpscProgressiveChunkedQueueProducerBuffer +{ + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +// $gen:ordered-fields +abstract class MpscProgressiveChunkedQueueConsumerFields extends MpscProgressiveChunkedQueuePad3 +{ + private final static long C_INDEX_OFFSET = + fieldOffset(MpscProgressiveChunkedQueueConsumerFields.class, "consumerIndex"); + + private volatile long consumerIndex; + protected AtomicChunk consumerBuffer; + + @Override + public final long lvConsumerIndex() + { + return consumerIndex; + } + + final long lpConsumerIndex() + { + return UNSAFE.getLong(this, C_INDEX_OFFSET); + } + + final void soConsumerIndex(long newValue) + { + UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue); + } +} + +abstract class MpscProgressiveChunkedQueuePad4 extends MpscProgressiveChunkedQueueConsumerFields +{ + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +// $gen:ordered-fields +abstract class MpscProgressiveChunkedQueueFreeBuffer extends MpscProgressiveChunkedQueuePad4 +{ + private static final long F_BUFFER_OFFSET = + fieldOffset(MpscProgressiveChunkedQueueFreeBuffer.class, "freeBuffer"); + + private volatile AtomicChunk freeBuffer; + + final AtomicChunk lpFreeBuffer() + { + return (AtomicChunk) UNSAFE.getObject(this, F_BUFFER_OFFSET); + } + + final AtomicChunk lvFreeBuffer() + { + return this.freeBuffer; + } + + final void soFreeBuffer(AtomicChunk buffer) + { + UNSAFE.putOrderedObject(this, F_BUFFER_OFFSET, buffer); + } + + final boolean casFreeBuffer(AtomicChunk expected, AtomicChunk newValue) + { + return UNSAFE.compareAndSwapObject(this, F_BUFFER_OFFSET, expected, newValue); + } +} + +abstract class MpscProgressiveChunkedQueuePad5 extends MpscProgressiveChunkedQueueFreeBuffer +{ + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16, p17; +} + +/** + * An MPSC array queue which starts at initialCapacity and grows unbounded in linked chunks.
+ * Differently from {@link MpscUnboundedArrayQueue} it is designed to provide a better scaling when more + * producers are concurrently offering. + * + * @param + * @author https://github.com/franz1981 + */ +public class MpscProgressiveChunkedQueue extends MpscProgressiveChunkedQueuePad5 + implements MessagePassingQueue, QueueProgressIndicators +{ + private final int chunkMask; + private final int chunkShift; + + public MpscProgressiveChunkedQueue(int chunkSize) + { + chunkSize = Pow2.roundToPowerOfTwo(chunkSize); + final AtomicChunk first = new AtomicChunk(0, null, chunkSize); + soProducerBuffer(first); + consumerBuffer = first; + chunkMask = chunkSize - 1; + chunkShift = Integer.numberOfTrailingZeros(chunkSize); + soFreeBuffer(new AtomicChunk(-1, null, chunkSize)); + } + + private AtomicChunk producerBufferOf(AtomicChunk producerBuffer, long chunkIndex) + { + long jumpBackward = producerBuffer == null ? -1 : producerBuffer.lvIndex() - chunkIndex; + while (jumpBackward < 0) + { + producerBuffer = lvProducerBuffer(); + //IMPORTANT: a long stall could make producerBuffer being recycled and reused + //hence AtomicChunk::index is no longer stable and could be: + //- the old value: it will be ignored because < chunkIndex + //- the new value: prev could be not stable yet, if queried + //If recycling is avoided, there is no need to wait until prev got stable. + jumpBackward = producerBuffer.lvIndex() - chunkIndex; + } + for (long i = 0; i < jumpBackward; i++) + { + //wait until prev is stable + AtomicChunk prevBuffer; + while ((prevBuffer = producerBuffer.lvPrev()) == null) + { + + } + producerBuffer = prevBuffer; + } + assert producerBuffer.lvIndex() == chunkIndex; + return producerBuffer; + } + + @Override + public long currentProducerIndex() + { + return lvProducerIndex(); + } + + @Override + public long currentConsumerIndex() + { + return lvConsumerIndex(); + } + + @Override + public boolean offer(E e) + { + if (null == e) + { + throw new NullPointerException(); + } + final int chunkMask = this.chunkMask; + final int chunkShift = this.chunkShift; + final long producerSeq = getAndIncrementProducerIndex(); + final int pOffset = (int) (producerSeq & chunkMask); + final long chunkIndex = producerSeq >> chunkShift; + final boolean firstElementOfNewChunk = pOffset == 0 && chunkIndex > 0; + final long currentChunkIndex = firstElementOfNewChunk ? chunkIndex - 1 : chunkIndex; + AtomicChunk producerBuffer = lvProducerBuffer(); + if (producerBuffer.lvIndex() != currentChunkIndex) + { + producerBuffer = producerBufferOf(producerBuffer, currentChunkIndex); + } + if (firstElementOfNewChunk) + { + appendElementAndLinkChunk(producerBuffer, chunkIndex, chunkMask + 1, e, null); + } + else + { + producerBuffer.soElement(pOffset, e); + } + return true; + } + + private void appendElementAndLinkChunk( + AtomicChunk producerBuffer, + long chunkIndex, + int chunkSize, + E e, + Supplier s) + { + //try to get a recycled/free buffer, but give up on contention + AtomicChunk newChunk = lvFreeBuffer(); + if (newChunk != null) + { + if (casFreeBuffer(newChunk, null)) + { + //newChunk could be a previous producer buffer! + //to be sure that producerBuffer set are ordered and race-free + //we set the producerBuffer before init: + //- if index is old there is no harm: producerBufferOf won't proceed + soProducerBuffer(newChunk); + //it will unblock other awaiting offers on producerBufferOf that has: + //- used the recycled value accidentally due to a long pause after lvProducerBuffer + //- used the last producerBuffer having an old index value + newChunk.init(chunkIndex, producerBuffer); + } + else + { + newChunk = null; + } + } + if (newChunk == null) + { + newChunk = new AtomicChunk(chunkIndex, producerBuffer, chunkSize); + //for a new buffer is ok to be published just when ready ie index and prev are both stable + soProducerBuffer(newChunk); + } + if (e == null) + { + e = s.get(); + } + //IMPORTANT: this order ensure that producerBuffer::soNext + //is releasing the first element write on the new chunk + newChunk.spElement(0, e); + producerBuffer.soNext(newChunk); + } + + + private static E spinForElement(AtomicChunk chunk, int offset) + { + E e; + while ((e = chunk.lvElement(offset)) == null) + { + + } + return e; + } + + private AtomicChunk spinForNextIfNotEmpty(AtomicChunk consumerBuffer, long consumerIndex) + { + AtomicChunk next = consumerBuffer.lvNext(); + if (next == null) + { + if (lvProducerIndex() == consumerIndex) + { + return null; + } + while ((next = consumerBuffer.lvNext()) == null) + { + + } + } + return next; + } + + + private AtomicChunk pollNextBuffer(AtomicChunk consumerBuffer, long consumerIndex) + { + final AtomicChunk next = spinForNextIfNotEmpty(consumerBuffer, consumerIndex); + if (next == null) + { + return null; + } + //save from nepotism + consumerBuffer.spNext(null); + //recycle buffer if necessary: optimistic reciclying + if (lpFreeBuffer() == null) + { + soFreeBuffer(consumerBuffer); + } + next.spPrev(null); + return next; + } + + @Override + public E poll() + { + final int chunkMask = this.chunkMask; + final long consumerIndex = this.lpConsumerIndex(); + AtomicChunk consumerBuffer = this.consumerBuffer; + final int consumerOffset = (int) (consumerIndex & chunkMask); + final int chunkSize = chunkMask + 1; + final boolean firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; + if (firstElementOfNewChunk) + { + consumerBuffer = pollNextBuffer(consumerBuffer, consumerIndex); + if (consumerBuffer == null) + { + return null; + } + this.consumerBuffer = consumerBuffer; + } + E e = consumerBuffer.lvElement(consumerOffset); + if (e != null) + { + consumerBuffer.spElement(consumerOffset, null); + soConsumerIndex(consumerIndex + 1); + return e; + } + assert !firstElementOfNewChunk; + if (lvProducerIndex() == consumerIndex) + { + return null; + } + e = spinForElement(consumerBuffer, consumerOffset); + consumerBuffer.spElement(consumerOffset, null); + soConsumerIndex(consumerIndex + 1); + return e; + } + + @Override + public E peek() + { + final int chunkMask = this.chunkMask; + final long consumerIndex = this.lpConsumerIndex(); + AtomicChunk consumerBuffer = this.consumerBuffer; + final int consumerOffset = (int) (consumerIndex & chunkMask); + final int chunkSize = chunkMask + 1; + final boolean firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; + if (firstElementOfNewChunk) + { + final AtomicChunk next = spinForNextIfNotEmpty(consumerBuffer, consumerIndex); + if (next == null) + { + return null; + } + consumerBuffer = next; + } + E e = consumerBuffer.lvElement(consumerOffset); + if (e != null) + { + return e; + } + assert !firstElementOfNewChunk; + if (lvProducerIndex() == consumerIndex) + { + return null; + } + e = spinForElement(consumerBuffer, consumerOffset); + return e; + } + + @Override + public Iterator iterator() + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + return IndexedQueueSizeUtil.size(this); + } + + @Override + public int capacity() + { + return MessagePassingQueue.UNBOUNDED_CAPACITY; + } + + @Override + public boolean relaxedOffer(E e) + { + return offer(e); + } + + private AtomicChunk relaxedPollNextBuffer(AtomicChunk consumerBuffer) + { + final AtomicChunk next = consumerBuffer.lvNext(); + if (next == null) + { + return null; + } + //save from nepotism + consumerBuffer.spNext(null); + //recycle buffer if necessary: optimistic reciclying + if (lpFreeBuffer() == null) + { + soFreeBuffer(consumerBuffer); + } + next.spPrev(null); + return next; + } + + @Override + public E relaxedPoll() + { + final int chunkMask = this.chunkMask; + final long consumerIndex = this.lpConsumerIndex(); + AtomicChunk consumerBuffer = this.consumerBuffer; + final int consumerOffset = (int) (consumerIndex & chunkMask); + final int chunkSize = chunkMask + 1; + final boolean firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; + if (firstElementOfNewChunk) + { + consumerBuffer = relaxedPollNextBuffer(consumerBuffer); + if (consumerBuffer == null) + { + return null; + } + this.consumerBuffer = consumerBuffer; + //the element on consumerIndex can't be null from now on + } + E e = consumerBuffer.lvElement(consumerOffset); + if (e == null) + { + assert !firstElementOfNewChunk; + return null; + } + consumerBuffer.spElement(consumerOffset, null); + soConsumerIndex(consumerIndex + 1); + return e; + } + + @Override + public E relaxedPeek() + { + final int chunkMask = this.chunkMask; + final long consumerIndex = this.lpConsumerIndex(); + AtomicChunk consumerBuffer = this.consumerBuffer; + final int consumerOffset = (int) (consumerIndex & chunkMask); + final int chunkSize = chunkMask + 1; + final boolean firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; + if (firstElementOfNewChunk) + { + final AtomicChunk next = consumerBuffer.lvNext(); + if (next == null) + { + return null; + } + consumerBuffer = next; + //the element on consumerIndex can't be null from now on + } + final E e = consumerBuffer.lvElement(consumerOffset); + if (e != null) + { + return e; + } + assert !firstElementOfNewChunk; + return null; + } + + @Override + public int drain(Consumer c) + { + return drain(c, chunkMask + 1); + } + + @Override + public int fill(Supplier s) + { + long result = 0;// result is a long because we want to have a safepoint check at regular intervals + final int capacity = chunkMask + 1; + final int offerBatch = Math.min(PortableJvmInfo.RECOMENDED_OFFER_BATCH, capacity); + do + { + final int filled = fill(s, offerBatch); + if (filled == 0) + { + return (int) result; + } + result += filled; + } + while (result <= capacity); + return (int) result; + } + + @Override + public int drain(Consumer c, int limit) + { + final int chunkMask = this.chunkMask; + final int chunkSize = chunkMask + 1; + long consumerIndex = this.lpConsumerIndex(); + AtomicChunk consumerBuffer = this.consumerBuffer; + + for (int i = 0; i < limit; i++) + { + final int consumerOffset = (int) (consumerIndex & chunkMask); + final boolean firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; + if (firstElementOfNewChunk) + { + consumerBuffer = relaxedPollNextBuffer(consumerBuffer); + if (consumerBuffer == null) + { + return i; + } + this.consumerBuffer = consumerBuffer; + //the element on consumerIndex can't be null from now on + } + final long nextConsumerIndex = consumerIndex + 1; + E e = consumerBuffer.lvElement(consumerOffset); + if (e == null) + { + assert !firstElementOfNewChunk; + return i; + } + consumerBuffer.spElement(consumerOffset, null); + soConsumerIndex(nextConsumerIndex); + c.accept(e); + consumerIndex = nextConsumerIndex; + } + return limit; + } + + @Override + public int fill(Supplier s, int limit) + { + final int chunkShift = this.chunkShift; + final int chunkMask = this.chunkMask; + long producerSeq = getAndAddProducerIndex(limit); + AtomicChunk producerBuffer = null; + for (int i = 0; i < limit; i++) + { + final int pOffset = (int) (producerSeq & chunkMask); + final long chunkIndex = producerSeq >> chunkShift; + final boolean firstElementOfNewChunk = pOffset == 0 && chunkIndex > 0; + final long currentChunkIndex = firstElementOfNewChunk ? chunkIndex - 1 : chunkIndex; + producerBuffer = producerBufferOf(producerBuffer, currentChunkIndex); + if (firstElementOfNewChunk) + { + appendElementAndLinkChunk(producerBuffer, chunkIndex, chunkMask + 1, null, s); + } + else + { + producerBuffer.soElement(pOffset, s.get()); + } + producerSeq++; + } + return limit; + } + + @Override + public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) + { + MessagePassingQueueUtil.drain(this, c, wait, exit); + } + + @Override + public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) + { + while (exit.keepRunning()) + { + fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH); + } + } + + @Override + public String toString() + { + return this.getClass().getName(); + } + +} diff --git a/jctools-core/src/test/java/org/jctools/queues/MpqSanityTestMpscProgressiveChunked.java b/jctools-core/src/test/java/org/jctools/queues/MpqSanityTestMpscProgressiveChunked.java new file mode 100644 index 00000000..21cb9263 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/MpqSanityTestMpscProgressiveChunked.java @@ -0,0 +1,28 @@ +package org.jctools.queues; + +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class MpqSanityTestMpscProgressiveChunked extends MpqSanityTest +{ + + public MpqSanityTestMpscProgressiveChunked(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeMpq(0, 1, 0, Ordering.FIFO, new MpscProgressiveChunkedQueue<>(1))); + list.add(makeMpq(0, 1, 0, Ordering.FIFO, new MpscProgressiveChunkedQueue<>(64))); + return list; + } +} diff --git a/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestMpscProgressiveChunked.java b/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestMpscProgressiveChunked.java new file mode 100644 index 00000000..febca780 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestMpscProgressiveChunked.java @@ -0,0 +1,29 @@ +package org.jctools.queues; + +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; + +@RunWith(Parameterized.class) +public class QueueSanityTestMpscProgressiveChunked extends QueueSanityTest +{ + public QueueSanityTestMpscProgressiveChunked(ConcurrentQueueSpec spec, Queue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeQueue(0, 1, 0, Ordering.FIFO, new MpscProgressiveChunkedQueue<>(1))); + list.add(makeQueue(0, 1, 0, Ordering.FIFO, new MpscProgressiveChunkedQueue<>(64))); + return list; + } + +}