diff --git a/jctools-core/src/main/java/org/jctools/queues/MpmcProgressiveChunkedQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpmcProgressiveChunkedQueue.java new file mode 100644 index 00000000..e4e6eb91 --- /dev/null +++ b/jctools-core/src/main/java/org/jctools/queues/MpmcProgressiveChunkedQueue.java @@ -0,0 +1,697 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jctools.queues; + +import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue; +import org.jctools.util.PortableJvmInfo; +import org.jctools.util.Pow2; +import org.jctools.util.UnsafeRefArrayAccess; + +import java.util.AbstractQueue; +import java.util.Iterator; + +import static org.jctools.util.UnsafeAccess.UNSAFE; +import static org.jctools.util.UnsafeAccess.fieldOffset; + + +abstract class MpmcProgressiveChunkedQueuePad1 extends AbstractQueue implements IndexedQueue +{ + long p01, p02, p03, p04, p05, p06, p07; + long p10, p11, p12, p13, p14, p15, p16, p17; + + static final class AtomicChunk + { + final static int NIL_CHUNK_INDEX = -1; + private static final long PREV_OFFSET = fieldOffset(AtomicChunk.class, "prev"); + private static final long NEXT_OFFSET = fieldOffset(AtomicChunk.class, "next"); + private static final long INDEX_OFFSET = fieldOffset(AtomicChunk.class, "index"); + private volatile AtomicChunk prev; + private volatile long index; + private volatile AtomicChunk next; + private final E[] buffer; + + AtomicChunk(long index, AtomicChunk prev, int size) + { + buffer = CircularArrayOffsetCalculator.allocate(size); + spNext(null); + spPrev(prev); + spIndex(index); + } + + final AtomicChunk lvNext() + { + return next; + } + + final AtomicChunk lpPrev() + { + return (AtomicChunk) UNSAFE.getObject(this, PREV_OFFSET); + } + + final long lvIndex() + { + return index; + } + + final void soIndex(long index) + { + UNSAFE.putOrderedLong(this, INDEX_OFFSET, index); + } + + final void spIndex(long index) + { + UNSAFE.putLong(this, INDEX_OFFSET, index); + } + + final void soNext(AtomicChunk value) + { + UNSAFE.putOrderedObject(this, NEXT_OFFSET, value); + } + + final void spNext(AtomicChunk value) + { + UNSAFE.putObject(this, NEXT_OFFSET, value); + } + + final void spPrev(AtomicChunk value) + { + UNSAFE.putObject(this, PREV_OFFSET, value); + } + + final void soElement(int index, E e) + { + UnsafeRefArrayAccess.soElement(buffer, UnsafeRefArrayAccess.calcElementOffset(index), e); + } + + final void spElement(int index, E e) + { + UnsafeRefArrayAccess.spElement(buffer, UnsafeRefArrayAccess.calcElementOffset(index), e); + } + + final E lvElement(int index) + { + return UnsafeRefArrayAccess.lvElement(buffer, UnsafeRefArrayAccess.calcElementOffset(index)); + } + + } +} + +// $gen:ordered-fields +abstract class MpmcProgressiveChunkedQueueProducerFields extends MpmcProgressiveChunkedQueuePad1 +{ + private final static long P_INDEX_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueProducerFields.class, "producerIndex"); + private volatile long producerIndex; + + @Override + public final long lvProducerIndex() + { + return producerIndex; + } + + final long getAndIncrementProducerIndex() + { + return UNSAFE.getAndAddLong(this, P_INDEX_OFFSET, 1); + } + + final long getAndAddProducerIndex(long delta) + { + return UNSAFE.getAndAddLong(this, P_INDEX_OFFSET, delta); + } +} + +abstract class MpmcProgressiveChunkedQueuePad2 extends MpmcProgressiveChunkedQueueProducerFields +{ + long p01, p02, p03, p04, p05, p06, p07, p08; + long p10, p11, p12, p13, p14, p15, p16; +} + +// $gen:ordered-fields +abstract class MpmcProgressiveChunkedQueueProducerBuffer extends MpmcProgressiveChunkedQueuePad2 +{ + private static final long P_BUFFER_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueProducerBuffer.class, "producerBuffer"); + private static final long P_CHUNK_INDEX_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueProducerBuffer.class, "producerChunkIndex"); + + private volatile AtomicChunk producerBuffer; + private volatile long producerChunkIndex; + + + final long lvProducerChunkIndex() + { + return producerChunkIndex; + } + + final boolean casProducerChunkIndex(long expected, long value) + { + return UNSAFE.compareAndSwapLong(this, P_CHUNK_INDEX_OFFSET, expected, value); + } + + final void soProducerChunkIndex(long value) + { + UNSAFE.putOrderedLong(this, P_CHUNK_INDEX_OFFSET, value); + } + + final AtomicChunk lvProducerBuffer() + { + return this.producerBuffer; + } + + final void soProducerBuffer(AtomicChunk buffer) + { + UNSAFE.putOrderedObject(this, P_BUFFER_OFFSET, buffer); + } +} + +abstract class MpmcProgressiveChunkedQueuePad3 extends MpmcProgressiveChunkedQueueProducerBuffer +{ + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16; +} + +// $gen:ordered-fields +abstract class MpmcProgressiveChunkedQueueConsumerFields extends MpmcProgressiveChunkedQueuePad3 +{ + private final static long C_INDEX_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueConsumerFields.class, "consumerIndex"); + private final static long C_BUFFER_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueConsumerFields.class, "consumerBuffer"); + + private volatile long consumerIndex; + private volatile AtomicChunk consumerBuffer; + + @Override + public final long lvConsumerIndex() + { + return consumerIndex; + } + + final boolean casConsumerIndex(long expect, long newValue) + { + return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); + } + + final AtomicChunk lvConsumerBuffer() + { + return this.consumerBuffer; + } + + final void soConsumerBuffer(AtomicChunk newValue) + { + UNSAFE.putOrderedObject(this, C_BUFFER_OFFSET, newValue); + } + +} + +abstract class MpmcProgressiveChunkedQueuePad4 extends MpmcProgressiveChunkedQueueConsumerFields +{ + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16; +} + +// $gen:ordered-fields +abstract class MpmcProgressiveChunkedQueueFreeBuffer extends MpmcProgressiveChunkedQueuePad4 +{ + private static final long F_BUFFER_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueFreeBuffer.class, "freeBuffer"); + + private volatile AtomicChunk freeBuffer; + + final AtomicChunk lvFreeBuffer() + { + return this.freeBuffer; + } + + final void soFreeBuffer(AtomicChunk buffer) + { + UNSAFE.putOrderedObject(this, F_BUFFER_OFFSET, buffer); + } +} + +abstract class MpmcProgressiveChunkedQueuePad5 extends MpmcProgressiveChunkedQueueFreeBuffer +{ + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16; +} + +/** + * An MPMC array queue which starts at initialCapacity and grows unbounded in linked chunks.
+ * Differently from {@link MpmcArrayQueue} it is designed to provide a better scaling when more + * producers are concurrently offering. + * + * @param + * @author https://github.com/franz1981 + */ +public class MpmcProgressiveChunkedQueue extends MpmcProgressiveChunkedQueuePad5 + implements MessagePassingQueue, QueueProgressIndicators +{ + private final int chunkMask; + private final int chunkShift; + + public MpmcProgressiveChunkedQueue(int chunkSize) + { + chunkSize = Pow2.roundToPowerOfTwo(chunkSize); + final AtomicChunk first = new AtomicChunk(0, null, chunkSize); + soProducerBuffer(first); + soProducerChunkIndex(0); + soConsumerBuffer(first); + chunkMask = chunkSize - 1; + chunkShift = Integer.numberOfTrailingZeros(chunkSize); + soFreeBuffer(new AtomicChunk(AtomicChunk.NIL_CHUNK_INDEX, null, chunkSize)); + } + + private AtomicChunk producerBufferOf(AtomicChunk producerBuffer, long expectedChunkIndex) + { + long jumpBackward; + while (true) + { + if (producerBuffer == null) + { + producerBuffer = lvProducerBuffer(); + } + final long producerChunkIndex = producerBuffer.lvIndex(); + if (producerChunkIndex == AtomicChunk.NIL_CHUNK_INDEX) + { + //force an attempt to fetch it another time + producerBuffer = null; + continue; + } + jumpBackward = producerChunkIndex - expectedChunkIndex; + if (jumpBackward >= 0) + { + break; + } + //try validate against the last producer chunk index + if (lvProducerChunkIndex() == producerChunkIndex) + { + producerBuffer = appendNextChunk(producerBuffer, producerChunkIndex, chunkMask + 1); + } + else + { + producerBuffer = null; + } + } + for (long i = 0; i < jumpBackward; i++) + { + //prev cannot be null, because is being released by index + producerBuffer = producerBuffer.lpPrev(); + assert producerBuffer != null; + } + assert producerBuffer.lvIndex() == expectedChunkIndex; + return producerBuffer; + } + + private AtomicChunk appendNextChunk(AtomicChunk producerBuffer, long chunkIndex, int chunkSize) + { + assert chunkIndex != AtomicChunk.NIL_CHUNK_INDEX; + final long nextChunkIndex = chunkIndex + 1; + //prevent other concurrent attempts on appendNextChunk + if (!casProducerChunkIndex(chunkIndex, nextChunkIndex)) + { + return null; + } + AtomicChunk newChunk = lvFreeBuffer(); + if (newChunk != null) + { + //single-writer: producerBuffer::index == nextChunkIndex is protecting it + soFreeBuffer(null); + assert newChunk.lvIndex() == AtomicChunk.NIL_CHUNK_INDEX; + //prevent other concurrent attempts on appendNextChunk + soProducerBuffer(newChunk); + newChunk.spPrev(producerBuffer); + //index set is releasing prev, allowing other pending offers to continue + newChunk.soIndex(nextChunkIndex); + } + else + { + newChunk = new AtomicChunk(nextChunkIndex, producerBuffer, chunkSize); + soProducerBuffer(newChunk); + } + //link the next chunk only when finished + producerBuffer.soNext(newChunk); + return newChunk; + } + + @Override + public long currentProducerIndex() + { + return lvProducerIndex(); + } + + @Override + public long currentConsumerIndex() + { + return lvConsumerIndex(); + } + + @Override + public boolean offer(E e) + { + if (null == e) + { + throw new NullPointerException(); + } + final int chunkMask = this.chunkMask; + final int chunkShift = this.chunkShift; + final long producerSeq = getAndIncrementProducerIndex(); + final int pOffset = (int) (producerSeq & chunkMask); + final long chunkIndex = producerSeq >> chunkShift; + AtomicChunk producerBuffer = lvProducerBuffer(); + if (producerBuffer.lvIndex() != chunkIndex) + { + producerBuffer = producerBufferOf(producerBuffer, chunkIndex); + } + //producerBuffer could be a recycled one and + //maybe there are consumers left behind, waiting to + //consume it + while (producerBuffer.lvElement(pOffset) != null) + { + } + producerBuffer.soElement(pOffset, e); + return true; + } + + private static E spinForElement(AtomicChunk chunk, int offset) + { + E e; + while ((e = chunk.lvElement(offset)) == null) + { + + } + return e; + } + + private AtomicChunk rotateConsumerBuffer(AtomicChunk consumerBuffer, AtomicChunk next) + { + //save from nepotism + consumerBuffer.spNext(null); + consumerBuffer.soIndex(AtomicChunk.NIL_CHUNK_INDEX); + //recycle buffer if necessary: optimistic reciclying + //change the chunkIndex to a non valid value + //to stop offering/polling threads to use this buffer + if (lvFreeBuffer() == null) + { + consumerBuffer.soIndex(AtomicChunk.NIL_CHUNK_INDEX); + soFreeBuffer(consumerBuffer); + } + next.spPrev(null); + soConsumerBuffer(next); + return next; + } + + @Override + public E poll() + { + final int chunkMask = this.chunkMask; + final int chunkShift = this.chunkShift; + long consumerIndex; + AtomicChunk consumerBuffer; + int consumerOffset; + final int chunkSize = chunkMask + 1; + boolean firstElementOfNewChunk; + E e = null; + AtomicChunk next = null; + do + { + consumerIndex = this.lvConsumerIndex(); + consumerBuffer = this.lvConsumerBuffer(); + consumerOffset = (int) (consumerIndex & chunkMask); + final long chunkIndex = consumerIndex >> chunkShift; + firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; + if (firstElementOfNewChunk) + { + final long expectedChunkIndex = chunkIndex - 1; + if (expectedChunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer or is yet to rotating it + continue; + } + next = consumerBuffer.lvNext(); + if (next == null) + { + if (lvProducerIndex() == consumerIndex) + { + return null; + } + //if another consumer rotate consumerBuffer, its chunkIndex will change + while ((next = consumerBuffer.lvNext()) == null) + { + if (expectedChunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer + break; + } + } + if (next == null) + { + continue; + } + } + } + else + { + if (chunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer or is yet to rotating it + continue; + } + e = consumerBuffer.lvElement(consumerOffset); + if (e == null) + { + if (lvProducerIndex() == consumerIndex) + { + return null; + } + //if the buffer is not empty, another consumer could have already + //stolen it, incrementing consumerIndex too: better to check if it has happened + continue; + } + } + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) + { + break; + } + } + while (true); + //if we are the firstElementOfNewChunk we need to rotate the consumer buffer + if (firstElementOfNewChunk) + { + //we can freely spin awaiting producer, because we are the only in charge to + //rotate the consumer buffer and using next + e = spinForElement(next, consumerOffset); + consumerBuffer = rotateConsumerBuffer(consumerBuffer, next); + } + //allow producers using a recycled buffer to be sure that consuming has happened + consumerBuffer.soElement(consumerOffset, null); + return e; + } + + @Override + public E peek() + { + final int chunkMask = this.chunkMask; + final int chunkShift = this.chunkShift; + long consumerIndex; + AtomicChunk consumerBuffer; + int consumerOffset; + boolean firstElementOfNewChunk; + E e; + AtomicChunk next; + do + { + consumerIndex = this.lvConsumerIndex(); + consumerBuffer = this.lvConsumerBuffer(); + final long chunkIndex = consumerIndex >> chunkShift; + consumerOffset = (int) (consumerIndex & chunkMask); + final int chunkSize = chunkMask + 1; + firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; + if (firstElementOfNewChunk) + { + final long expectedChunkIndex = chunkIndex - 1; + if (expectedChunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer or is yet to rotating it + continue; + } + next = consumerBuffer.lvNext(); + if (next == null) + { + if (lvProducerIndex() == consumerIndex) + { + return null; + } + //if another consumer rotate consumerBuffer, its chunkIndex will change + while ((next = consumerBuffer.lvNext()) == null) + { + if (expectedChunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer + break; + } + } + if (next == null) + { + continue; + } + } + consumerBuffer = next; + } + else + { + if (chunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer or is yet to rotating it + continue; + } + } + e = consumerBuffer.lvElement(consumerOffset); + if (e == null) + { + if (lvProducerIndex() == consumerIndex) + { + return null; + } + //if the q is not empty, another consumer could have already + //stolen it, incrementing consumerIndex too: better to check if it has happened + continue; + } + return e; + } + while (true); + } + + @Override + public Iterator iterator() + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + return IndexedQueueSizeUtil.size(this); + } + + @Override + public int capacity() + { + return MessagePassingQueue.UNBOUNDED_CAPACITY; + } + + @Override + public boolean relaxedOffer(E e) + { + return offer(e); + } + + @Override + public E relaxedPoll() + { + return poll(); + } + + @Override + public E relaxedPeek() + { + return peek(); + } + + @Override + public int drain(Consumer c) + { + return drain(c, chunkMask + 1); + } + + @Override + public int fill(Supplier s) + { + long result = 0;// result is a long because we want to have a safepoint check at regular intervals + final int capacity = chunkMask + 1; + final int offerBatch = Math.min(PortableJvmInfo.RECOMENDED_OFFER_BATCH, capacity); + do + { + final int filled = fill(s, offerBatch); + if (filled == 0) + { + return (int) result; + } + result += filled; + } + while (result <= capacity); + return (int) result; + } + + @Override + public int drain(Consumer c, int limit) + { + for (int i = 0; i < limit; i++) + { + final E e = relaxedPoll(); + if (e == null) + { + return i; + } + c.accept(e); + } + return limit; + } + + @Override + public int fill(Supplier s, int limit) + { + final int chunkShift = this.chunkShift; + final int chunkMask = this.chunkMask; + long producerSeq = getAndAddProducerIndex(limit); + AtomicChunk producerBuffer = null; + for (int i = 0; i < limit; i++) + { + final int pOffset = (int) (producerSeq & chunkMask); + final long chunkIndex = producerSeq >> chunkShift; + if (producerBuffer == null || producerBuffer.lvIndex() != chunkIndex) + { + producerBuffer = producerBufferOf(producerBuffer, chunkIndex); + } + while (producerBuffer.lvElement(pOffset) != null) + { + + } + producerBuffer.soElement(pOffset, s.get()); + producerSeq++; + } + return limit; + } + + @Override + public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) + { + MessagePassingQueueUtil.drain(this, c, wait, exit); + } + + @Override + public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) + { + while (exit.keepRunning()) + { + fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH); + } + } + + @Override + public String toString() + { + return this.getClass().getName(); + } + +} diff --git a/jctools-core/src/test/java/org/jctools/queues/MpqSanityTestMpmcProgressiveChunked.java b/jctools-core/src/test/java/org/jctools/queues/MpqSanityTestMpmcProgressiveChunked.java new file mode 100644 index 00000000..dbe63eb9 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/MpqSanityTestMpmcProgressiveChunked.java @@ -0,0 +1,29 @@ +package org.jctools.queues; + +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class MpqSanityTestMpmcProgressiveChunked extends MpqSanityTest +{ + public MpqSanityTestMpmcProgressiveChunked(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeMpq(0, 0, 0, Ordering.FIFO, new MpmcProgressiveChunkedQueue<>(1))); + list.add(makeMpq(0, 0, 0, Ordering.FIFO, new MpmcProgressiveChunkedQueue<>(64))); + return list; + } + + +} diff --git a/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestMpmcProgressiveChunked.java b/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestMpmcProgressiveChunked.java new file mode 100644 index 00000000..e94a7c5c --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestMpmcProgressiveChunked.java @@ -0,0 +1,33 @@ +package org.jctools.queues; + +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class QueueSanityTestMpmcProgressiveChunked extends QueueSanityTest +{ + public QueueSanityTestMpmcProgressiveChunked(ConcurrentQueueSpec spec, Queue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeQueue(0, 0, 0, Ordering.FIFO, new MpmcProgressiveChunkedQueue<>(1))); + list.add(makeQueue(0, 0, 0, Ordering.FIFO, new MpmcProgressiveChunkedQueue<>(64))); + return list; + } + +}