diff --git a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java index 962d61f0..16ca3ff4 100644 --- a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java @@ -26,9 +26,11 @@ import static org.jctools.queues.LinkedArrayQueueUtil.modifiedCalcElementOffset; import static org.jctools.util.UnsafeAccess.UNSAFE; import static org.jctools.util.UnsafeAccess.fieldOffset; +import static org.jctools.util.UnsafeRefArrayAccess.calcElementOffset; import static org.jctools.util.UnsafeRefArrayAccess.lvElement; import static org.jctools.util.UnsafeRefArrayAccess.soElement; + abstract class BaseMpscLinkedArrayQueuePad1 extends AbstractQueue implements IndexedQueue { long p01, p02, p03, p04, p05, p06, p07; @@ -135,6 +137,7 @@ public abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQue { // No post padding here, subclasses must add private static final Object JUMP = new Object(); + private static final Object BUFFER_CONSUMED = new Object(); private static final int CONTINUE_TO_P_INDEX_CAS = 0; private static final int RETRY = 1; private static final int QUEUE_FULL = 2; @@ -161,12 +164,6 @@ public BaseMpscLinkedArrayQueue(final int initialCapacity) soProducerLimit(mask); // we know it's all empty to start with } - @Override - public final Iterator iterator() - { - throw new UnsupportedOperationException(); - } - @Override public final int size() { @@ -312,7 +309,7 @@ public E poll() if (e == JUMP) { - final E[] nextBuffer = getNextBuffer(buffer, mask); + final E[] nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } @@ -348,7 +345,7 @@ public E peek() } if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -399,11 +396,13 @@ else if (casProducerIndex(pIndex, pIndex + 1)) protected abstract long availableInQueue(long pIndex, long cIndex); @SuppressWarnings("unchecked") - private E[] getNextBuffer(final E[] buffer, final long mask) + private E[] nextBuffer(final E[] buffer, final long mask) { final long offset = nextArrayOffset(mask); final E[] nextBuffer = (E[]) lvElement(buffer, offset); - soElement(buffer, offset, null); + consumerBuffer = nextBuffer; + consumerMask = (length(nextBuffer) - 2) << 1; + soElement(buffer, offset, BUFFER_CONSUMED); return nextBuffer; } @@ -414,7 +413,7 @@ private long nextArrayOffset(long mask) private E newBufferPoll(E[] nextBuffer, long index) { - final long offset = newBufferAndOffset(nextBuffer, index); + final long offset = modifiedCalcElementOffset(index, consumerMask); final E n = lvElement(nextBuffer, offset);// LoadLoad if (n == null) { @@ -427,7 +426,7 @@ private E newBufferPoll(E[] nextBuffer, long index) private E newBufferPeek(E[] nextBuffer, long index) { - final long offset = newBufferAndOffset(nextBuffer, index); + final long offset = modifiedCalcElementOffset(index, consumerMask); final E n = lvElement(nextBuffer, offset);// LoadLoad if (null == n) { @@ -436,13 +435,6 @@ private E newBufferPeek(E[] nextBuffer, long index) return n; } - private long newBufferAndOffset(E[] nextBuffer, long index) - { - consumerBuffer = nextBuffer; - consumerMask = (length(nextBuffer) - 2) << 1; - return modifiedCalcElementOffset(index, consumerMask); - } - @Override public long currentProducerIndex() { @@ -480,7 +472,7 @@ public E relaxedPoll() } if (e == JUMP) { - final E[] nextBuffer = getNextBuffer(buffer, mask); + final E[] nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } soElement(buffer, offset, null); @@ -500,7 +492,7 @@ public E relaxedPeek() Object e = lvElement(buffer, offset);// LoadLoad if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -640,6 +632,74 @@ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) c.accept(e); } } + + /** + * Get an iterator for this queue. This method is thread safe. + *

+ * The iterator provides a best-effort snapshot of the elements in the queue. + * The returned iterator is not guaranteed to return elements in queue order, + * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * + * @return The iterator. + */ + @Override + public Iterator iterator() { + return new WeakIterator(); + } + + private final class WeakIterator implements Iterator { + + private long nextIndex; + private E nextElement; + private E[] currentBuffer; + private int currentBufferLength; + + WeakIterator() { + setBuffer(consumerBuffer); + nextElement = getNext(); + } + + @Override + public boolean hasNext() { + return nextElement != null; + } + + @Override + public E next() { + E e = nextElement; + nextElement = getNext(); + return e; + } + + private void setBuffer(E[] buffer) { + this.currentBuffer = buffer; + this.currentBufferLength = length(buffer); + this.nextIndex = 0; + } + + private E getNext() { + while (true) { + while (nextIndex < currentBufferLength - 1) { + long offset = calcElementOffset(nextIndex++); + E e = lvElement(currentBuffer, offset); + if (e != null && e != JUMP) { + return e; + } + } + long offset = calcElementOffset(currentBufferLength - 1); + Object nextArray = lvElement(currentBuffer, offset); + if (nextArray == BUFFER_CONSUMED) { + //Consumer may have passed us, just jump to the current consumer buffer + setBuffer(consumerBuffer); + } else if (nextArray != null) { + setBuffer((E[]) nextArray); + } else { + return null; + } + } + } + } private void resize(long oldMask, E[] oldBuffer, long pIndex, E e) { diff --git a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java index 6c62d2f7..cfecadd5 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java @@ -19,6 +19,8 @@ import static org.jctools.util.UnsafeAccess.fieldOffset; import static org.jctools.util.UnsafeRefArrayAccess.*; +import java.util.Iterator; + abstract class MpscArrayQueueL1Pad extends ConcurrentCircularArrayQueue { long p00, p01, p02, p03, p04, p05, p06, p07; @@ -562,4 +564,58 @@ public void fill(Supplier s, WaitStrategy w, ExitCondition exit) idleCounter = 0; } } + + /** + * Get an iterator for this queue. This method is thread safe. + *

+ * The iterator provides a best-effort snapshot of the elements in the queue. + * The returned iterator is not guaranteed to return elements in queue order, + * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * + * @return The iterator. + */ + @Override + public final Iterator iterator() { + final long cIndex = lvConsumerIndex(); + final long pIndex = lvProducerIndex(); + + return new WeakIterator(cIndex, pIndex); + } + + private final class WeakIterator implements Iterator { + + private final long pIndex; + private long nextIndex; + private E nextElement; + + WeakIterator(long cIndex, long pIndex) { + this.nextIndex = cIndex; + this.pIndex = pIndex; + nextElement = getNext(); + } + + @Override + public boolean hasNext() { + return nextElement != null; + } + + @Override + public E next() { + E e = nextElement; + nextElement = getNext(); + return e; + } + + private E getNext() { + while (nextIndex < pIndex) { + long offset = calcElementOffset(nextIndex++); + E e = lvElement(buffer, offset); + if (e != null) { + return e; + } + } + return null; + } + } } diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java index 7edf95e8..5d2025ca 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java @@ -159,6 +159,8 @@ public abstract class BaseMpscLinkedAtomicArrayQueue extends BaseMpscLinkedAt // No post padding here, subclasses must add private static final Object JUMP = new Object(); + private static final Object BUFFER_CONSUMED = new Object(); + private static final int CONTINUE_TO_P_INDEX_CAS = 0; private static final int RETRY = 1; @@ -186,11 +188,6 @@ public BaseMpscLinkedAtomicArrayQueue(final int initialCapacity) { soProducerLimit(mask); } - @Override - public final Iterator iterator() { - throw new UnsupportedOperationException(); - } - @Override public final int size() { // NOTE: because indices are on even numbers we cannot use the size util. @@ -300,7 +297,7 @@ public E poll() { } } if (e == JUMP) { - final AtomicReferenceArray nextBuffer = getNextBuffer(buffer, mask); + final AtomicReferenceArray nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } // release element null @@ -331,7 +328,7 @@ public E peek() { } while (e == null); } if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -370,10 +367,12 @@ private int offerSlowPath(long mask, long pIndex, long producerLimit) { protected abstract long availableInQueue(long pIndex, long cIndex); @SuppressWarnings("unchecked") - private AtomicReferenceArray getNextBuffer(final AtomicReferenceArray buffer, final long mask) { + private AtomicReferenceArray nextBuffer(final AtomicReferenceArray buffer, final long mask) { final int offset = nextArrayOffset(mask); final AtomicReferenceArray nextBuffer = (AtomicReferenceArray) lvElement(buffer, offset); - soElement(buffer, offset, null); + consumerBuffer = nextBuffer; + consumerMask = (length(nextBuffer) - 2) << 1; + soElement(buffer, offset, BUFFER_CONSUMED); return nextBuffer; } @@ -382,7 +381,7 @@ private int nextArrayOffset(long mask) { } private E newBufferPoll(AtomicReferenceArray nextBuffer, long index) { - final int offset = newBufferAndOffset(nextBuffer, index); + final int offset = modifiedCalcElementOffset(index, consumerMask); // LoadLoad final E n = lvElement(nextBuffer, offset); if (n == null) { @@ -395,7 +394,7 @@ private E newBufferPoll(AtomicReferenceArray nextBuffer, long index) { } private E newBufferPeek(AtomicReferenceArray nextBuffer, long index) { - final int offset = newBufferAndOffset(nextBuffer, index); + final int offset = modifiedCalcElementOffset(index, consumerMask); // LoadLoad final E n = lvElement(nextBuffer, offset); if (null == n) { @@ -404,12 +403,6 @@ private E newBufferPeek(AtomicReferenceArray nextBuffer, long index) { return n; } - private int newBufferAndOffset(AtomicReferenceArray nextBuffer, long index) { - consumerBuffer = nextBuffer; - consumerMask = (length(nextBuffer) - 2) << 1; - return modifiedCalcElementOffset(index, consumerMask); - } - @Override public long currentProducerIndex() { return lvProducerIndex() / 2; @@ -441,7 +434,7 @@ public E relaxedPoll() { return null; } if (e == JUMP) { - final AtomicReferenceArray nextBuffer = getNextBuffer(buffer, mask); + final AtomicReferenceArray nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } soElement(buffer, offset, null); @@ -459,7 +452,7 @@ public E relaxedPeek() { // LoadLoad Object e = lvElement(buffer, offset); if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -571,6 +564,81 @@ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) { } } + /** + * Get an iterator for this queue. This method is thread safe. + *

+ * The iterator provides a best-effort snapshot of the elements in the queue. + * The returned iterator is not guaranteed to return elements in queue order, + * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * + * @return The iterator. + */ + @Override + public Iterator iterator() { + return new WeakIterator(); + } + + /** + * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator + * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java. + */ + private final class WeakIterator implements Iterator { + + private long nextIndex; + + private E nextElement; + + private AtomicReferenceArray currentBuffer; + + private int currentBufferLength; + + WeakIterator() { + setBuffer(consumerBuffer); + nextElement = getNext(); + } + + @Override + public boolean hasNext() { + return nextElement != null; + } + + @Override + public E next() { + E e = nextElement; + nextElement = getNext(); + return e; + } + + private void setBuffer(AtomicReferenceArray buffer) { + this.currentBuffer = buffer; + this.currentBufferLength = length(buffer); + this.nextIndex = 0; + } + + private E getNext() { + while (true) { + while (nextIndex < currentBufferLength - 1) { + int offset = calcElementOffset(nextIndex++); + E e = lvElement(currentBuffer, offset); + if (e != null && e != JUMP) { + return e; + } + } + int offset = calcElementOffset(currentBufferLength - 1); + Object nextArray = lvElement(currentBuffer, offset); + if (nextArray == BUFFER_CONSUMED) { + //Consumer may have passed us, just jump to the current consumer buffer + setBuffer(consumerBuffer); + } else if (nextArray != null) { + setBuffer((AtomicReferenceArray) nextArray); + } else { + return null; + } + } + } + } + private void resize(long oldMask, AtomicReferenceArray oldBuffer, long pIndex, E e) { int newBufferLength = getNextBufferSize(oldBuffer); final AtomicReferenceArray newBuffer = allocate(newBufferLength); diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/LinkedAtomicArrayQueueUtil.java b/jctools-core/src/main/java/org/jctools/queues/atomic/LinkedAtomicArrayQueueUtil.java index c606b988..2ff01de9 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/LinkedAtomicArrayQueueUtil.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/LinkedAtomicArrayQueueUtil.java @@ -55,6 +55,11 @@ static int modifiedCalcElementOffset(long index, long mask) { return (int) (index & mask) >> 1; } + + static int calcElementOffset(long index) + { + return (int) index; + } static int nextArrayOffset(AtomicReferenceArray curr) { diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java index 0bb5cc32..3bdc77f4 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java @@ -14,6 +14,7 @@ package org.jctools.queues.atomic; import org.jctools.util.PortableJvmInfo; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicLongArray; @@ -533,6 +534,65 @@ public void fill(Supplier s, WaitStrategy w, ExitCondition exit) { } } + /** + * Get an iterator for this queue. This method is thread safe. + *

+ * The iterator provides a best-effort snapshot of the elements in the queue. + * The returned iterator is not guaranteed to return elements in queue order, + * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * + * @return The iterator. + */ + @Override + public final Iterator iterator() { + final long cIndex = lvConsumerIndex(); + final long pIndex = lvProducerIndex(); + return new WeakIterator(cIndex, pIndex); + } + + /** + * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator + * which can found in the jctools-build module. The original source file is MpscArrayQueue.java. + */ + private final class WeakIterator implements Iterator { + + private final long pIndex; + + private long nextIndex; + + private E nextElement; + + WeakIterator(long cIndex, long pIndex) { + this.nextIndex = cIndex; + this.pIndex = pIndex; + nextElement = getNext(); + } + + @Override + public boolean hasNext() { + return nextElement != null; + } + + @Override + public E next() { + E e = nextElement; + nextElement = getNext(); + return e; + } + + private E getNext() { + while (nextIndex < pIndex) { + int offset = calcElementOffset(nextIndex++); + E e = lvElement(buffer, offset); + if (e != null) { + return e; + } + } + return null; + } + } + /** * @deprecated This was renamed to failFastOffer please migrate */ diff --git a/jctools-core/src/main/java/org/jctools/queues/package-info.java b/jctools-core/src/main/java/org/jctools/queues/package-info.java index ac892774..6697859f 100644 --- a/jctools-core/src/main/java/org/jctools/queues/package-info.java +++ b/jctools-core/src/main/java/org/jctools/queues/package-info.java @@ -29,7 +29,7 @@ *
* Limited Queue methods support:
* The queues implement a subset of the {@link java.util.Queue} interface which is documented under the - * {@link org.jctools.queues.MessagePassingQueue} interface. In particular {@link java.util.Queue#iterator()} is not + * {@link org.jctools.queues.MessagePassingQueue} interface. In particular {@link java.util.Queue#iterator()} is usually not * supported and dependent methods from {@link java.util.AbstractQueue} are also not supported such as: *

    *
  1. {@link java.util.Queue#remove(Object)} @@ -38,6 +38,7 @@ *
  2. {@link java.util.Queue#contains(Object)} *
  3. {@link java.util.Queue#containsAll(java.util.Collection)} *
+ * A few queues do support a limited form of iteration. This support is documented in the Javadoc of the relevant queues. *

*
* Memory layout controls and False Sharing:
diff --git a/jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java b/jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java new file mode 100644 index 00000000..d404fd1b --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.jctools.queues; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +public class MpscArrayQueueSnapshotTest { + + private MpscArrayQueue queue; + + @Before + public void setUp() throws Exception { + this.queue = new MpscArrayQueue<>(4); + } + + @Test + public void testIterator() { + queue.offer(0); + assertThat(iteratorToList(), contains(0)); + for (int i = 1; i < queue.capacity(); i++) { + queue.offer(i); + } + assertThat(iteratorToList(), containsInAnyOrder(0, 1, 2, 3)); + queue.poll(); + queue.offer(4); + queue.poll(); + assertThat(iteratorToList(), containsInAnyOrder(2, 3, 4)); + } + + @Test + public void testIteratorHasNextConcurrentModification() { + //There may be gaps in the elements returned by the iterator, + //but hasNext needs to be reliable even if the elements are consumed between hasNext() and next(). + queue.offer(0); + queue.offer(1); + Iterator iter = queue.iterator(); + assertThat(iter.hasNext(), is(true)); + queue.poll(); + queue.poll(); + assertThat(queue.isEmpty(), is(true)); + assertThat(iter.hasNext(), is(true)); + assertThat(iter.next(), is(0)); + assertThat(iter.hasNext(), is(false)); + } + + private List iteratorToList() { + List list = new ArrayList<>(); + Iterator iter = queue.iterator(); + iter.forEachRemaining(list::add); + return list; + } + +} diff --git a/jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java b/jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java new file mode 100644 index 00000000..c0024048 --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.jctools.queues; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.junit.Before; +import org.junit.Test; + +public class MpscUnboundedArrayQueueSnapshotTest { + + private static final int CHUNK_SIZE = 4; + private MpscUnboundedArrayQueue queue; + + @Before + public void setUp() throws Exception { + this.queue = new MpscUnboundedArrayQueue<>(CHUNK_SIZE + 1); //Account for extra slot for JUMP + } + + @Test + public void testIterator() { + queue.offer(0); + assertThat(iteratorToList(), contains(0)); + for (int i = 1; i < CHUNK_SIZE; i++) { + queue.offer(i); + } + assertThat(iteratorToList(), containsInAnyOrder(0, 1, 2, 3)); + queue.offer(4); + queue.offer(5); + assertThat(iteratorToList(), containsInAnyOrder(0, 1, 2, 3, 4, 5)); + queue.poll(); + assertThat(iteratorToList(), containsInAnyOrder(1, 2, 3, 4, 5)); + for (int i = 1; i < CHUNK_SIZE; i++) { + queue.poll(); + } + assertThat(iteratorToList(), containsInAnyOrder(4, 5)); + } + + @Test + public void testIteratorOutpacedByConsumer() { + int slotsToForceMultipleBuffers = CHUNK_SIZE + 1; + for (int i = 0; i < slotsToForceMultipleBuffers; i++) { + queue.offer(i); + } + Iterator iter = queue.iterator(); + List entries = new ArrayList<>(); + entries.add(iter.next()); + for (int i = 0; i < CHUNK_SIZE; i++) { + queue.poll(); + } + //Now that the consumer has discarded the first buffer, the iterator needs to follow it to the new buffer. + iter.forEachRemaining(entries::add); + assertThat(entries, containsInAnyOrder(0, 1, 4)); + } + + @Test + public void testIteratorHasNextConcurrentModification() { + /* + * There may be gaps in the elements returned by the iterator, but hasNext needs to be reliable even if the elements are consumed + * between hasNext() and next(), and even if the consumer buffer changes. + */ + int slotsToForceMultipleBuffers = CHUNK_SIZE + 1; + for (int i = 0; i < slotsToForceMultipleBuffers; i++) { + queue.offer(i); + } + Iterator iter = queue.iterator(); + assertThat(iter.hasNext(), is(true)); + for (int i = 0; i < slotsToForceMultipleBuffers; i++) { + queue.poll(); + } + assertThat(queue.isEmpty(), is(true)); + assertThat(iter.hasNext(), is(true)); + assertThat(iter.next(), is(0)); + assertThat(iter.hasNext(), is(false)); + } + + private List iteratorToList() { + List list = new ArrayList<>(); + Iterator iter = queue.iterator(); + iter.forEachRemaining(list::add); + return list; + } + +}