From 8a44ab9b52e20e12fe2a4036e975a97d397c0e93 Mon Sep 17 00:00:00 2001 From: Francesco Nigro Date: Wed, 22 May 2019 11:33:07 +0200 Subject: [PATCH] Fixes #226: XADD mpmc queue: highly scalable queue --- .../queues/MpmcProgressiveChunkedQueue.java | 781 ++++++++++++++++++ .../MpqSanityTestMpmcProgressiveChunked.java | 29 + ...QueueSanityTestMpmcProgressiveChunked.java | 33 + 3 files changed, 843 insertions(+) create mode 100644 jctools-core/src/main/java/org/jctools/queues/MpmcProgressiveChunkedQueue.java create mode 100644 jctools-core/src/test/java/org/jctools/queues/MpqSanityTestMpmcProgressiveChunked.java create mode 100644 jctools-core/src/test/java/org/jctools/queues/QueueSanityTestMpmcProgressiveChunked.java diff --git a/jctools-core/src/main/java/org/jctools/queues/MpmcProgressiveChunkedQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpmcProgressiveChunkedQueue.java new file mode 100644 index 00000000..611b28b3 --- /dev/null +++ b/jctools-core/src/main/java/org/jctools/queues/MpmcProgressiveChunkedQueue.java @@ -0,0 +1,781 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jctools.queues; + +import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue; +import org.jctools.util.PortableJvmInfo; +import org.jctools.util.Pow2; +import org.jctools.util.UnsafeAccess; +import org.jctools.util.UnsafeRefArrayAccess; + +import java.util.AbstractQueue; +import java.util.Arrays; +import java.util.Iterator; + +import static org.jctools.util.UnsafeAccess.UNSAFE; +import static org.jctools.util.UnsafeAccess.fieldOffset; + + +abstract class MpmcProgressiveChunkedQueuePad1 extends AbstractQueue implements IndexedQueue +{ + long p01, p02, p03, p04, p05, p06, p07; + long p10, p11, p12, p13, p14, p15, p16, p17; + + static final class AtomicChunk + { + private static final long ARRAY_BASE; + private static final int ELEMENT_SHIFT; + + static + { + final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(long[].class); + if (8 == scale) + { + ELEMENT_SHIFT = 3; + } + else + { + throw new IllegalStateException("Unexpected long[] element size"); + } + // Including the buffer pad in the array base offset + ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(long[].class); + } + + final static int NIL_CHUNK_INDEX = -1; + private static final long PREV_OFFSET = fieldOffset(AtomicChunk.class, "prev"); + private static final long NEXT_OFFSET = fieldOffset(AtomicChunk.class, "next"); + private static final long INDEX_OFFSET = fieldOffset(AtomicChunk.class, "index"); + private volatile AtomicChunk prev; + private volatile long index; + private volatile AtomicChunk next; + private final long[] sequence; + private final E[] buffer; + + AtomicChunk(long index, AtomicChunk prev, int size, boolean pooled) + { + buffer = CircularArrayOffsetCalculator.allocate(size); + spNext(null); + spPrev(prev); + spIndex(index); + if (pooled) + { + sequence = new long[size]; + Arrays.fill(sequence, AtomicChunk.NIL_CHUNK_INDEX); + } + else + { + sequence = null; + } + } + + final boolean isPooled() + { + return sequence != null; + } + + private static long calcSequenceOffset(long index) + { + return ARRAY_BASE + (index << ELEMENT_SHIFT); + } + + final void soSequence(int index, long e) + { + UNSAFE.putOrderedLong(sequence, calcSequenceOffset(index), e); + } + + final long lvSequence(int index) + { + return UNSAFE.getLongVolatile(sequence, calcSequenceOffset(index)); + } + + final AtomicChunk lvNext() + { + return next; + } + + final AtomicChunk lpPrev() + { + return (AtomicChunk) UNSAFE.getObject(this, PREV_OFFSET); + } + + final long lvIndex() + { + return index; + } + + final void soIndex(long index) + { + UNSAFE.putOrderedLong(this, INDEX_OFFSET, index); + } + + final void spIndex(long index) + { + UNSAFE.putLong(this, INDEX_OFFSET, index); + } + + final void soNext(AtomicChunk value) + { + UNSAFE.putOrderedObject(this, NEXT_OFFSET, value); + } + + final void spNext(AtomicChunk value) + { + UNSAFE.putObject(this, NEXT_OFFSET, value); + } + + final void spPrev(AtomicChunk value) + { + UNSAFE.putObject(this, PREV_OFFSET, value); + } + + final void soElement(int index, E e) + { + UnsafeRefArrayAccess.soElement(buffer, UnsafeRefArrayAccess.calcElementOffset(index), e); + } + + final void spElement(int index, E e) + { + UnsafeRefArrayAccess.spElement(buffer, UnsafeRefArrayAccess.calcElementOffset(index), e); + } + + final E lvElement(int index) + { + return UnsafeRefArrayAccess.lvElement(buffer, UnsafeRefArrayAccess.calcElementOffset(index)); + } + + } +} + +// $gen:ordered-fields +abstract class MpmcProgressiveChunkedQueueProducerFields extends MpmcProgressiveChunkedQueuePad1 +{ + private final static long P_INDEX_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueProducerFields.class, "producerIndex"); + private volatile long producerIndex; + + @Override + public final long lvProducerIndex() + { + return producerIndex; + } + + final long getAndIncrementProducerIndex() + { + return UNSAFE.getAndAddLong(this, P_INDEX_OFFSET, 1); + } + + final long getAndAddProducerIndex(long delta) + { + return UNSAFE.getAndAddLong(this, P_INDEX_OFFSET, delta); + } +} + +abstract class MpmcProgressiveChunkedQueuePad2 extends MpmcProgressiveChunkedQueueProducerFields +{ + long p01, p02, p03, p04, p05, p06, p07, p08; + long p10, p11, p12, p13, p14, p15, p16; +} + +// $gen:ordered-fields +abstract class MpmcProgressiveChunkedQueueProducerBuffer extends MpmcProgressiveChunkedQueuePad2 +{ + private static final long P_BUFFER_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueProducerBuffer.class, "producerBuffer"); + private static final long P_CHUNK_INDEX_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueProducerBuffer.class, "producerChunkIndex"); + + private volatile AtomicChunk producerBuffer; + private volatile long producerChunkIndex; + + + final long lvProducerChunkIndex() + { + return producerChunkIndex; + } + + final boolean casProducerChunkIndex(long expected, long value) + { + return UNSAFE.compareAndSwapLong(this, P_CHUNK_INDEX_OFFSET, expected, value); + } + + final void soProducerChunkIndex(long value) + { + UNSAFE.putOrderedLong(this, P_CHUNK_INDEX_OFFSET, value); + } + + final AtomicChunk lvProducerBuffer() + { + return this.producerBuffer; + } + + final void soProducerBuffer(AtomicChunk buffer) + { + UNSAFE.putOrderedObject(this, P_BUFFER_OFFSET, buffer); + } +} + +abstract class MpmcProgressiveChunkedQueuePad3 extends MpmcProgressiveChunkedQueueProducerBuffer +{ + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16; +} + +// $gen:ordered-fields +abstract class MpmcProgressiveChunkedQueueConsumerFields extends MpmcProgressiveChunkedQueuePad3 +{ + private final static long C_INDEX_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueConsumerFields.class, "consumerIndex"); + private final static long C_BUFFER_OFFSET = + fieldOffset(MpmcProgressiveChunkedQueueConsumerFields.class, "consumerBuffer"); + + private volatile long consumerIndex; + private volatile AtomicChunk consumerBuffer; + + @Override + public final long lvConsumerIndex() + { + return consumerIndex; + } + + final boolean casConsumerIndex(long expect, long newValue) + { + return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); + } + + final AtomicChunk lvConsumerBuffer() + { + return this.consumerBuffer; + } + + final void soConsumerBuffer(AtomicChunk newValue) + { + UNSAFE.putOrderedObject(this, C_BUFFER_OFFSET, newValue); + } + +} + +abstract class MpmcProgressiveChunkedQueuePad5 extends MpmcProgressiveChunkedQueueConsumerFields +{ + long p0, p1, p2, p3, p4, p5, p6, p7; + long p10, p11, p12, p13, p14, p15, p16; +} + +/** + * An MPMC array queue which starts at initialCapacity and grows unbounded in linked chunks.
+ * Differently from {@link MpmcArrayQueue} it is designed to provide a better scaling when more + * producers are concurrently offering. + * + * @param + * @author https://github.com/franz1981 + */ +public class MpmcProgressiveChunkedQueue extends MpmcProgressiveChunkedQueuePad5 + implements MessagePassingQueue, QueueProgressIndicators +{ + private final int chunkMask; + private final int chunkShift; + private final SpscArrayQueue> freeBuffer; + + public MpmcProgressiveChunkedQueue(int chunkSize, int maxPooledChunks) + { + chunkSize = Pow2.roundToPowerOfTwo(chunkSize); + final AtomicChunk first = new AtomicChunk(0, null, chunkSize, true); + soProducerBuffer(first); + soProducerChunkIndex(0); + soConsumerBuffer(first); + chunkMask = chunkSize - 1; + chunkShift = Integer.numberOfTrailingZeros(chunkSize); + freeBuffer = new SpscArrayQueue>(maxPooledChunks + 1); + for (int i = 0; i < maxPooledChunks; i++) + { + freeBuffer.offer(new AtomicChunk(AtomicChunk.NIL_CHUNK_INDEX, null, chunkSize, true)); + } + } + + public MpmcProgressiveChunkedQueue(int chunkSize) + { + this(chunkSize, 1); + } + + private AtomicChunk producerBufferOf(AtomicChunk producerBuffer, long expectedChunkIndex) + { + long jumpBackward; + while (true) + { + if (producerBuffer == null) + { + producerBuffer = lvProducerBuffer(); + } + final long producerChunkIndex = producerBuffer.lvIndex(); + if (producerChunkIndex == AtomicChunk.NIL_CHUNK_INDEX) + { + //force an attempt to fetch it another time + producerBuffer = null; + continue; + } + jumpBackward = producerChunkIndex - expectedChunkIndex; + if (jumpBackward >= 0) + { + break; + } + //try validate against the last producer chunk index + if (lvProducerChunkIndex() == producerChunkIndex) + { + producerBuffer = appendNextChunk(producerBuffer, producerChunkIndex, chunkMask + 1); + } + else + { + producerBuffer = null; + } + } + for (long i = 0; i < jumpBackward; i++) + { + //prev cannot be null, because is being released by index + producerBuffer = producerBuffer.lpPrev(); + assert producerBuffer != null; + } + assert producerBuffer.lvIndex() == expectedChunkIndex; + return producerBuffer; + } + + private AtomicChunk appendNextChunk(AtomicChunk producerBuffer, long chunkIndex, int chunkSize) + { + assert chunkIndex != AtomicChunk.NIL_CHUNK_INDEX; + final long nextChunkIndex = chunkIndex + 1; + //prevent other concurrent attempts on appendNextChunk + if (!casProducerChunkIndex(chunkIndex, nextChunkIndex)) + { + return null; + } + AtomicChunk newChunk = freeBuffer.poll(); + if (newChunk != null) + { + assert newChunk.lvIndex() == AtomicChunk.NIL_CHUNK_INDEX; + //prevent other concurrent attempts on appendNextChunk + soProducerBuffer(newChunk); + newChunk.spPrev(producerBuffer); + //index set is releasing prev, allowing other pending offers to continue + newChunk.soIndex(nextChunkIndex); + } + else + { + newChunk = new AtomicChunk(nextChunkIndex, producerBuffer, chunkSize, false); + soProducerBuffer(newChunk); + } + //link the next chunk only when finished + producerBuffer.soNext(newChunk); + return newChunk; + } + + @Override + public long currentProducerIndex() + { + return lvProducerIndex(); + } + + @Override + public long currentConsumerIndex() + { + return lvConsumerIndex(); + } + + @Override + public boolean offer(E e) + { + if (null == e) + { + throw new NullPointerException(); + } + final int chunkMask = this.chunkMask; + final int chunkShift = this.chunkShift; + final long producerSeq = getAndIncrementProducerIndex(); + final int pOffset = (int) (producerSeq & chunkMask); + long chunkIndex = producerSeq >> chunkShift; + AtomicChunk producerBuffer = lvProducerBuffer(); + if (producerBuffer.lvIndex() != chunkIndex) + { + producerBuffer = producerBufferOf(producerBuffer, chunkIndex); + if (producerBuffer.isPooled()) + { + chunkIndex = producerBuffer.lvIndex(); + } + } + final boolean isPooled = producerBuffer.isPooled(); + if (isPooled) + { + //wait any previous consumer to finish its job + while (producerBuffer.lvElement(pOffset) != null) + { + + } + } + producerBuffer.soElement(pOffset, e); + if (isPooled) + { + producerBuffer.soSequence(pOffset, chunkIndex); + } + return true; + } + + private static E spinForElement(AtomicChunk chunk, int offset) + { + E e; + while ((e = chunk.lvElement(offset)) == null) + { + + } + return e; + } + + private void rotateConsumerBuffer(AtomicChunk consumerBuffer, AtomicChunk next) + { + next.spPrev(null); + //save from nepotism + consumerBuffer.spNext(null); + //prevent other consumers to use it + consumerBuffer.soIndex(AtomicChunk.NIL_CHUNK_INDEX); + if (consumerBuffer.isPooled()) + { + final boolean pooled = freeBuffer.offer(consumerBuffer); + assert pooled; + } + //expose next to the other consumers + soConsumerBuffer(next); + } + + @Override + public E poll() + { + final int chunkMask = this.chunkMask; + final int chunkShift = this.chunkShift; + long consumerIndex; + AtomicChunk consumerBuffer; + int consumerOffset; + final int chunkSize = chunkMask + 1; + boolean firstElementOfNewChunk; + E e = null; + AtomicChunk next = null; + long chunkIndex; + do + { + consumerIndex = this.lvConsumerIndex(); + consumerBuffer = this.lvConsumerBuffer(); + consumerOffset = (int) (consumerIndex & chunkMask); + chunkIndex = consumerIndex >> chunkShift; + firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; + if (firstElementOfNewChunk) + { + next = consumerBuffer.lvNext(); + final long expectedChunkIndex = chunkIndex - 1; + if (expectedChunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer or is yet to rotating it + continue; + } + if (next == null) + { + if (lvProducerIndex() == consumerIndex) + { + return null; + } + //if another consumer rotate consumerBuffer, its chunkIndex will change + while ((next = consumerBuffer.lvNext()) == null) + { + if (expectedChunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer + break; + } + } + if (next == null) + { + continue; + } + } + } + else + { + if (consumerBuffer.isPooled()) + { + if (consumerBuffer.lvSequence(consumerOffset) != chunkIndex) + { + if (lvProducerIndex() == consumerIndex) + { + return null; + } + continue; + } + e = consumerBuffer.lvElement(consumerOffset); + assert e != null; + } + else + { + e = consumerBuffer.lvElement(consumerOffset); + if (chunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer or is yet to rotating it + continue; + } + if (e == null) + { + if (lvProducerIndex() == consumerIndex) + { + return null; + } + //if the buffer is not empty, another consumer could have already + //stolen it, incrementing consumerIndex too: better to check if it has happened + continue; + } + } + } + if (casConsumerIndex(consumerIndex, consumerIndex + 1)) + { + break; + } + } + while (true); + //if we are the firstElementOfNewChunk we need to rotate the consumer buffer + if (firstElementOfNewChunk) + { + //we can freely spin awaiting producer, because we are the only one in charge to + //rotate the consumer buffer and using next + e = spinForElement(next, consumerOffset); + final boolean pooled = next.isPooled(); + if (pooled) + { + while (next.lvSequence(consumerOffset) != chunkIndex) + { + + } + } + next.soElement(consumerOffset, null); + rotateConsumerBuffer(consumerBuffer, next); + } + else + { + assert !consumerBuffer.isPooled() || + (consumerBuffer.isPooled() && consumerBuffer.lvSequence(consumerOffset) == chunkIndex); + consumerBuffer.soElement(consumerOffset, null); + } + return e; + } + + @Override + public E peek() + { + final int chunkMask = this.chunkMask; + final int chunkShift = this.chunkShift; + long consumerIndex; + AtomicChunk consumerBuffer; + int consumerOffset; + boolean firstElementOfNewChunk; + E e; + AtomicChunk next; + do + { + consumerIndex = this.lvConsumerIndex(); + consumerBuffer = this.lvConsumerBuffer(); + final long chunkIndex = consumerIndex >> chunkShift; + consumerOffset = (int) (consumerIndex & chunkMask); + final int chunkSize = chunkMask + 1; + firstElementOfNewChunk = consumerOffset == 0 && consumerIndex >= chunkSize; + if (firstElementOfNewChunk) + { + final long expectedChunkIndex = chunkIndex - 1; + next = consumerBuffer.lvNext(); + if (expectedChunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer or is yet to rotating it + continue; + } + if (next == null) + { + if (lvProducerIndex() == consumerIndex) + { + return null; + } + //if another consumer rotate consumerBuffer, its chunkIndex will change + while ((next = consumerBuffer.lvNext()) == null) + { + if (expectedChunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer + break; + } + } + if (next == null) + { + continue; + } + } + consumerBuffer = next; + } + e = consumerBuffer.lvElement(consumerOffset); + if (e != null) + { + //validate the element read + if (chunkIndex != consumerBuffer.lvIndex()) + { + //another consumer has already rotated the consumer buffer or is yet to rotating it + continue; + } + return e; + } + else + { + if (lvProducerIndex() == consumerIndex) + { + return null; + } + //if the q is not empty, another consumer could have already + //stolen it, incrementing consumerIndex too: better to check if it has happened + continue; + } + } + while (true); + } + + @Override + public Iterator iterator() + { + throw new UnsupportedOperationException(); + } + + @Override + public int size() + { + return IndexedQueueSizeUtil.size(this); + } + + @Override + public int capacity() + { + return MessagePassingQueue.UNBOUNDED_CAPACITY; + } + + @Override + public boolean relaxedOffer(E e) + { + return offer(e); + } + + @Override + public E relaxedPoll() + { + return poll(); + } + + @Override + public E relaxedPeek() + { + return peek(); + } + + @Override + public int drain(Consumer c) + { + return drain(c, chunkMask + 1); + } + + @Override + public int fill(Supplier s) + { + long result = 0;// result is a long because we want to have a safepoint check at regular intervals + final int capacity = chunkMask + 1; + final int offerBatch = Math.min(PortableJvmInfo.RECOMENDED_OFFER_BATCH, capacity); + do + { + final int filled = fill(s, offerBatch); + if (filled == 0) + { + return (int) result; + } + result += filled; + } + while (result <= capacity); + return (int) result; + } + + @Override + public int drain(Consumer c, int limit) + { + for (int i = 0; i < limit; i++) + { + final E e = relaxedPoll(); + if (e == null) + { + return i; + } + c.accept(e); + } + return limit; + } + + @Override + public int fill(Supplier s, int limit) + { + final int chunkShift = this.chunkShift; + final int chunkMask = this.chunkMask; + long producerSeq = getAndAddProducerIndex(limit); + AtomicChunk producerBuffer = null; + for (int i = 0; i < limit; i++) + { + final int pOffset = (int) (producerSeq & chunkMask); + long chunkIndex = producerSeq >> chunkShift; + if (producerBuffer == null || producerBuffer.lvIndex() != chunkIndex) + { + producerBuffer = producerBufferOf(producerBuffer, chunkIndex); + if (producerBuffer.isPooled()) + { + chunkIndex = producerBuffer.lvIndex(); + } + } + if (producerBuffer.isPooled()) + { + while (producerBuffer.lvElement(pOffset) != null) + { + + } + } + producerBuffer.soElement(pOffset, s.get()); + if (producerBuffer.isPooled()) + { + producerBuffer.soSequence(pOffset, chunkIndex); + } + producerSeq++; + } + return limit; + } + + @Override + public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) + { + MessagePassingQueueUtil.drain(this, c, wait, exit); + } + + @Override + public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) + { + while (exit.keepRunning()) + { + fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH); + } + } + + @Override + public String toString() + { + return this.getClass().getName(); + } + +} diff --git a/jctools-core/src/test/java/org/jctools/queues/MpqSanityTestMpmcProgressiveChunked.java b/jctools-core/src/test/java/org/jctools/queues/MpqSanityTestMpmcProgressiveChunked.java new file mode 100644 index 00000000..dbe63eb9 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/MpqSanityTestMpmcProgressiveChunked.java @@ -0,0 +1,29 @@ +package org.jctools.queues; + +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; + +@RunWith(Parameterized.class) +public class MpqSanityTestMpmcProgressiveChunked extends MpqSanityTest +{ + public MpqSanityTestMpmcProgressiveChunked(ConcurrentQueueSpec spec, MessagePassingQueue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeMpq(0, 0, 0, Ordering.FIFO, new MpmcProgressiveChunkedQueue<>(1))); + list.add(makeMpq(0, 0, 0, Ordering.FIFO, new MpmcProgressiveChunkedQueue<>(64))); + return list; + } + + +} diff --git a/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestMpmcProgressiveChunked.java b/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestMpmcProgressiveChunked.java new file mode 100644 index 00000000..e94a7c5c --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/QueueSanityTestMpmcProgressiveChunked.java @@ -0,0 +1,33 @@ +package org.jctools.queues; + +import org.jctools.queues.spec.ConcurrentQueueSpec; +import org.jctools.queues.spec.Ordering; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class QueueSanityTestMpmcProgressiveChunked extends QueueSanityTest +{ + public QueueSanityTestMpmcProgressiveChunked(ConcurrentQueueSpec spec, Queue queue) + { + super(spec, queue); + } + + @Parameterized.Parameters + public static Collection parameters() + { + ArrayList list = new ArrayList(); + list.add(makeQueue(0, 0, 0, Ordering.FIFO, new MpmcProgressiveChunkedQueue<>(1))); + list.add(makeQueue(0, 0, 0, Ordering.FIFO, new MpmcProgressiveChunkedQueue<>(64))); + return list; + } + +}