From a1146c2fa3a1ca8de1c1d0378c71be04995b016a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Wed, 16 Jan 2019 11:37:24 +0100 Subject: [PATCH 1/3] Issue-228: Add unorderedSnapshot method to MpscArrayQueue and BaseMpscLinkedArrayQueue --- .../queues/BaseMpscLinkedArrayQueue.java | 40 ++++++++++++- .../org/jctools/queues/MpscArrayQueue.java | 16 +++++ .../BaseMpscLinkedAtomicArrayQueue.java | 39 +++++++++++- .../atomic/LinkedAtomicArrayQueueUtil.java | 5 ++ .../queues/atomic/MpscAtomicArrayQueue.java | 15 +++++ .../queues/MpscArrayQueueSnapshotTest.java | 52 ++++++++++++++++ .../MpscUnboundedArrayQueueSnapshotTest.java | 59 +++++++++++++++++++ 7 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java create mode 100644 jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java diff --git a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java index 962d61f0..11c3467c 100644 --- a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java @@ -26,9 +26,13 @@ import static org.jctools.queues.LinkedArrayQueueUtil.modifiedCalcElementOffset; import static org.jctools.util.UnsafeAccess.UNSAFE; import static org.jctools.util.UnsafeAccess.fieldOffset; +import static org.jctools.util.UnsafeRefArrayAccess.calcElementOffset; import static org.jctools.util.UnsafeRefArrayAccess.lvElement; import static org.jctools.util.UnsafeRefArrayAccess.soElement; +import java.util.ArrayList; +import java.util.List; + abstract class BaseMpscLinkedArrayQueuePad1 extends AbstractQueue implements IndexedQueue { long p01, p02, p03, p04, p05, p06, p07; @@ -73,6 +77,7 @@ abstract class BaseMpscLinkedArrayQueueConsumerFields extends BaseMpscLinkedA private volatile long consumerIndex; protected long consumerMask; protected E[] consumerBuffer; + private volatile E[] volatileConsumerBuffer; @Override public final long lvConsumerIndex() @@ -80,6 +85,14 @@ public final long lvConsumerIndex() return consumerIndex; } + final E[] lvVolatileConsumerBuffer() { + return volatileConsumerBuffer; + } + + final void svVolatileConsumerBuffer(E[] newValue) { + volatileConsumerBuffer = newValue; + } + final long lpConsumerIndex() { return UNSAFE.getLong(this, C_INDEX_OFFSET); @@ -157,6 +170,7 @@ public BaseMpscLinkedArrayQueue(final int initialCapacity) producerBuffer = buffer; producerMask = mask; consumerBuffer = buffer; + svVolatileConsumerBuffer(buffer); consumerMask = mask; soProducerLimit(mask); // we know it's all empty to start with } @@ -403,7 +417,6 @@ private E[] getNextBuffer(final E[] buffer, final long mask) { final long offset = nextArrayOffset(mask); final E[] nextBuffer = (E[]) lvElement(buffer, offset); - soElement(buffer, offset, null); return nextBuffer; } @@ -439,6 +452,7 @@ private E newBufferPeek(E[] nextBuffer, long index) private long newBufferAndOffset(E[] nextBuffer, long index) { consumerBuffer = nextBuffer; + svVolatileConsumerBuffer(nextBuffer); consumerMask = (length(nextBuffer) - 2) << 1; return modifiedCalcElementOffset(index, consumerMask); } @@ -640,6 +654,30 @@ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) c.accept(e); } } + + public List unorderedSnapshot() { + E[] currentBuffer = lvVolatileConsumerBuffer(); + List elements = new ArrayList(); + while (true) { + int length = length(currentBuffer); + for (int i = 0; i < length - 1; i++) { + long offset = calcElementOffset(i); + Object element = lvElement(currentBuffer, offset); + if (element == JUMP || element == null) { + continue; + } + elements.add((E) element); + } + long offset = calcElementOffset((length - 1)); + Object nextArray = lvElement(currentBuffer, offset); + if (nextArray != null) { + currentBuffer = (E[]) nextArray; + } else { + break; + } + } + return elements; + } private void resize(long oldMask, E[] oldBuffer, long pIndex, E e) { diff --git a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java index 6c62d2f7..f56ab3ad 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java @@ -19,6 +19,9 @@ import static org.jctools.util.UnsafeAccess.fieldOffset; import static org.jctools.util.UnsafeRefArrayAccess.*; +import java.util.ArrayList; +import java.util.List; + abstract class MpscArrayQueueL1Pad extends ConcurrentCircularArrayQueue { long p00, p01, p02, p03, p04, p05, p06, p07; @@ -562,4 +565,17 @@ public void fill(Supplier s, WaitStrategy w, ExitCondition exit) idleCounter = 0; } } + + public List unorderedSnapshot() { + int length = capacity(); + List elements = new ArrayList(); + for (int i = 0; i < length; i++) { + long offset = calcElementOffset(i); + E element = lvElement(buffer, offset); + if (element != null) { + elements.add(element); + } + } + return elements; + } } diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java index 7edf95e8..1baf79c5 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java @@ -21,6 +21,8 @@ import java.util.Iterator; import static org.jctools.queues.atomic.LinkedAtomicArrayQueueUtil.length; import static org.jctools.queues.atomic.LinkedAtomicArrayQueueUtil.modifiedCalcElementOffset; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.jctools.queues.MessagePassingQueue; @@ -92,11 +94,21 @@ abstract class BaseMpscLinkedAtomicArrayQueueConsumerFields extends BaseMpscL protected AtomicReferenceArray consumerBuffer; + private volatile AtomicReferenceArray volatileConsumerBuffer; + @Override public final long lvConsumerIndex() { return consumerIndex; } + final AtomicReferenceArray lvVolatileConsumerBuffer() { + return volatileConsumerBuffer; + } + + final void svVolatileConsumerBuffer(AtomicReferenceArray newValue) { + volatileConsumerBuffer = newValue; + } + final long lpConsumerIndex() { return consumerIndex; } @@ -181,6 +193,7 @@ public BaseMpscLinkedAtomicArrayQueue(final int initialCapacity) { producerBuffer = buffer; producerMask = mask; consumerBuffer = buffer; + svVolatileConsumerBuffer(buffer); consumerMask = mask; // we know it's all empty to start with soProducerLimit(mask); @@ -373,7 +386,6 @@ private int offerSlowPath(long mask, long pIndex, long producerLimit) { private AtomicReferenceArray getNextBuffer(final AtomicReferenceArray buffer, final long mask) { final int offset = nextArrayOffset(mask); final AtomicReferenceArray nextBuffer = (AtomicReferenceArray) lvElement(buffer, offset); - soElement(buffer, offset, null); return nextBuffer; } @@ -406,6 +418,7 @@ private E newBufferPeek(AtomicReferenceArray nextBuffer, long index) { private int newBufferAndOffset(AtomicReferenceArray nextBuffer, long index) { consumerBuffer = nextBuffer; + svVolatileConsumerBuffer(nextBuffer); consumerMask = (length(nextBuffer) - 2) << 1; return modifiedCalcElementOffset(index, consumerMask); } @@ -571,6 +584,30 @@ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) { } } + public List unorderedSnapshot() { + AtomicReferenceArray currentBuffer = lvVolatileConsumerBuffer(); + List elements = new ArrayList(); + while (true) { + int length = length(currentBuffer); + for (int i = 0; i < length - 1; i++) { + int offset = calcElementOffset(i); + Object element = lvElement(currentBuffer, offset); + if (element == JUMP || element == null) { + continue; + } + elements.add((E) element); + } + int offset = calcElementOffset((length - 1)); + Object nextArray = lvElement(currentBuffer, offset); + if (nextArray != null) { + currentBuffer = (AtomicReferenceArray) nextArray; + } else { + break; + } + } + return elements; + } + private void resize(long oldMask, AtomicReferenceArray oldBuffer, long pIndex, E e) { int newBufferLength = getNextBufferSize(oldBuffer); final AtomicReferenceArray newBuffer = allocate(newBufferLength); diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/LinkedAtomicArrayQueueUtil.java b/jctools-core/src/main/java/org/jctools/queues/atomic/LinkedAtomicArrayQueueUtil.java index c606b988..2ff01de9 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/LinkedAtomicArrayQueueUtil.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/LinkedAtomicArrayQueueUtil.java @@ -55,6 +55,11 @@ static int modifiedCalcElementOffset(long index, long mask) { return (int) (index & mask) >> 1; } + + static int calcElementOffset(long index) + { + return (int) index; + } static int nextArrayOffset(AtomicReferenceArray curr) { diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java index 0bb5cc32..b4281c1c 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java @@ -14,6 +14,8 @@ package org.jctools.queues.atomic; import org.jctools.util.PortableJvmInfo; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicLongArray; @@ -533,6 +535,19 @@ public void fill(Supplier s, WaitStrategy w, ExitCondition exit) { } } + public List unorderedSnapshot() { + int length = capacity(); + List elements = new ArrayList(); + for (int i = 0; i < length; i++) { + int offset = calcElementOffset(i); + E element = lvElement(buffer, offset); + if (element != null) { + elements.add(element); + } + } + return elements; + } + /** * @deprecated This was renamed to failFastOffer please migrate */ diff --git a/jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java b/jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java new file mode 100644 index 00000000..ece286db --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.jctools.queues; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import org.junit.Before; +import org.junit.Test; + +public class MpscArrayQueueSnapshotTest { + + private MpscArrayQueue queue; + + @Before + public void setUp() throws Exception { + this.queue = new MpscArrayQueue<>(4); + } + + @Test + public void testSnapshot() { + queue.offer(0); + assertThat(queue.unorderedSnapshot(), contains(0)); + for (int i = 1; i < queue.capacity(); i++) { + queue.offer(i); + } + assertThat(queue.unorderedSnapshot(), containsInAnyOrder(0, 1, 2, 3)); + queue.poll(); + queue.offer(4); + assertThat(queue.unorderedSnapshot(), containsInAnyOrder(1, 2, 3, 4)); + } + +} diff --git a/jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java b/jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java new file mode 100644 index 00000000..6314e69e --- /dev/null +++ b/jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.jctools.queues; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertThat; + +import org.junit.Before; +import org.junit.Test; + +public class MpscUnboundedArrayQueueSnapshotTest { + + private static final int CHUNK_SIZE = 4; + private MpscUnboundedArrayQueue queue; + + @Before + public void setUp() throws Exception { + this.queue = new MpscUnboundedArrayQueue<>(CHUNK_SIZE); + } + + @Test + public void testSnapshot() { + queue.offer(0); + assertThat(queue.unorderedSnapshot(), contains(0)); + for (int i = 1; i < CHUNK_SIZE; i++) { + queue.offer(i); + } + assertThat(queue.unorderedSnapshot(), containsInAnyOrder(0, 1, 2, 3)); + queue.offer(4); + queue.offer(5); + assertThat(queue.unorderedSnapshot(), containsInAnyOrder(0, 1, 2, 3, 4, 5)); + queue.poll(); + assertThat(queue.unorderedSnapshot(), containsInAnyOrder(1, 2, 3, 4, 5)); + for (int i = 1; i < CHUNK_SIZE; i++) { + queue.poll(); + } + assertThat(queue.unorderedSnapshot(), containsInAnyOrder(4, 5)); + } + +} From 48adfd10c2ba72d813608fb80d73b5dc53c698b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Fri, 18 Jan 2019 15:10:10 +0100 Subject: [PATCH 2/3] Iterator for MpscArrayQueue and MpscUnboundedArrayQueue --- .../queues/BaseMpscLinkedArrayQueue.java | 99 ++++++++++++------- .../org/jctools/queues/MpscArrayQueue.java | 58 +++++++++-- .../queues/MpscArrayQueueSnapshotTest.java | 37 ++++++- .../MpscUnboundedArrayQueueSnapshotTest.java | 63 ++++++++++-- 4 files changed, 200 insertions(+), 57 deletions(-) diff --git a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java index 11c3467c..de7cd68f 100644 --- a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java @@ -77,7 +77,6 @@ abstract class BaseMpscLinkedArrayQueueConsumerFields extends BaseMpscLinkedA private volatile long consumerIndex; protected long consumerMask; protected E[] consumerBuffer; - private volatile E[] volatileConsumerBuffer; @Override public final long lvConsumerIndex() @@ -85,14 +84,6 @@ public final long lvConsumerIndex() return consumerIndex; } - final E[] lvVolatileConsumerBuffer() { - return volatileConsumerBuffer; - } - - final void svVolatileConsumerBuffer(E[] newValue) { - volatileConsumerBuffer = newValue; - } - final long lpConsumerIndex() { return UNSAFE.getLong(this, C_INDEX_OFFSET); @@ -148,6 +139,7 @@ public abstract class BaseMpscLinkedArrayQueue extends BaseMpscLinkedArrayQue { // No post padding here, subclasses must add private static final Object JUMP = new Object(); + private static final Object BUFFER_CONSUMED = new Object(); private static final int CONTINUE_TO_P_INDEX_CAS = 0; private static final int RETRY = 1; private static final int QUEUE_FULL = 2; @@ -170,17 +162,10 @@ public BaseMpscLinkedArrayQueue(final int initialCapacity) producerBuffer = buffer; producerMask = mask; consumerBuffer = buffer; - svVolatileConsumerBuffer(buffer); consumerMask = mask; soProducerLimit(mask); // we know it's all empty to start with } - @Override - public final Iterator iterator() - { - throw new UnsupportedOperationException(); - } - @Override public final int size() { @@ -417,6 +402,7 @@ private E[] getNextBuffer(final E[] buffer, final long mask) { final long offset = nextArrayOffset(mask); final E[] nextBuffer = (E[]) lvElement(buffer, offset); + soElement(buffer, offset, BUFFER_CONSUMED); return nextBuffer; } @@ -452,7 +438,6 @@ private E newBufferPeek(E[] nextBuffer, long index) private long newBufferAndOffset(E[] nextBuffer, long index) { consumerBuffer = nextBuffer; - svVolatileConsumerBuffer(nextBuffer); consumerMask = (length(nextBuffer) - 2) << 1; return modifiedCalcElementOffset(index, consumerMask); } @@ -655,28 +640,70 @@ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) } } - public List unorderedSnapshot() { - E[] currentBuffer = lvVolatileConsumerBuffer(); - List elements = new ArrayList(); - while (true) { - int length = length(currentBuffer); - for (int i = 0; i < length - 1; i++) { - long offset = calcElementOffset(i); - Object element = lvElement(currentBuffer, offset); - if (element == JUMP || element == null) { - continue; + /** + * Get an iterator for this queue. This method is thread safe. + * + * The iterator provides a best-effort snapshot of the elements in the queue. + * The returned iterator is not guaranteed to return elements in queue order, + * and races with the consumer thread may cause gaps in the sequence of returned elements. + * @return The iterator. + */ + @Override + public Iterator iterator() { + return new WeakIterator(); + } + + private final class WeakIterator implements Iterator { + + private long nextIndex; + private E nextElement; + private E[] currentBuffer; + private int currentBufferLength; + + WeakIterator() { + setBuffer(consumerBuffer); + nextElement = getNext(); + } + + @Override + public boolean hasNext() { + return nextElement != null; + } + + @Override + public E next() { + E e = nextElement; + nextElement = getNext(); + return e; + } + + private void setBuffer(E[] buffer) { + this.currentBuffer = buffer; + this.currentBufferLength = length(buffer); + this.nextIndex = 0; + } + + private E getNext() { + while (true) { + while (nextIndex < currentBufferLength - 1) { + long offset = calcElementOffset(nextIndex++); + E e = lvElement(currentBuffer, offset); + if (e != null && e != JUMP) { + return e; + } + } + long offset = calcElementOffset(currentBufferLength - 1); + Object nextArray = lvElement(currentBuffer, offset); + if (nextArray == BUFFER_CONSUMED) { + //Consumer may have passed us, just jump to the current consumer buffer + setBuffer(consumerBuffer); + } else if (nextArray != null) { + setBuffer((E[]) nextArray); + } else { + return null; } - elements.add((E) element); - } - long offset = calcElementOffset((length - 1)); - Object nextArray = lvElement(currentBuffer, offset); - if (nextArray != null) { - currentBuffer = (E[]) nextArray; - } else { - break; } } - return elements; } private void resize(long oldMask, E[] oldBuffer, long pIndex, E e) diff --git a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java index f56ab3ad..5e806fcd 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java @@ -20,6 +20,7 @@ import static org.jctools.util.UnsafeRefArrayAccess.*; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; abstract class MpscArrayQueueL1Pad extends ConcurrentCircularArrayQueue @@ -566,16 +567,55 @@ public void fill(Supplier s, WaitStrategy w, ExitCondition exit) } } - public List unorderedSnapshot() { - int length = capacity(); - List elements = new ArrayList(); - for (int i = 0; i < length; i++) { - long offset = calcElementOffset(i); - E element = lvElement(buffer, offset); - if (element != null) { - elements.add(element); + /** + * Get an iterator for this queue. This method is thread safe. + * + * The iterator provides a best-effort snapshot of the elements in the queue. + * The returned iterator is not guaranteed to return elements in queue order, + * and races with the consumer thread may cause gaps in the sequence of returned elements. + * @return The iterator. + */ + @Override + public final Iterator iterator() { + final long cIndex = lvConsumerIndex(); + final long pIndex = lvProducerIndex(); + + return new WeakIterator(cIndex, pIndex); + } + + private final class WeakIterator implements Iterator { + + private final long pIndex; + private long nextIndex; + private E nextElement; + + WeakIterator(long cIndex, long pIndex) { + this.nextIndex = cIndex; + this.pIndex = pIndex; + nextElement = getNext(); + } + + @Override + public boolean hasNext() { + return nextElement != null; + } + + @Override + public E next() { + E e = nextElement; + nextElement = getNext(); + return e; + } + + private E getNext() { + while (nextIndex < pIndex) { + long offset = calcElementOffset(nextIndex++); + E e = lvElement(buffer, offset); + if (e != null) { + return e; + } } + return null; } - return elements; } } diff --git a/jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java b/jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java index ece286db..d404fd1b 100644 --- a/jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java +++ b/jctools-core/src/test/java/org/jctools/queues/MpscArrayQueueSnapshotTest.java @@ -19,11 +19,14 @@ package org.jctools.queues; -import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import org.junit.Before; import org.junit.Test; @@ -37,16 +40,40 @@ public void setUp() throws Exception { } @Test - public void testSnapshot() { + public void testIterator() { queue.offer(0); - assertThat(queue.unorderedSnapshot(), contains(0)); + assertThat(iteratorToList(), contains(0)); for (int i = 1; i < queue.capacity(); i++) { queue.offer(i); } - assertThat(queue.unorderedSnapshot(), containsInAnyOrder(0, 1, 2, 3)); + assertThat(iteratorToList(), containsInAnyOrder(0, 1, 2, 3)); queue.poll(); queue.offer(4); - assertThat(queue.unorderedSnapshot(), containsInAnyOrder(1, 2, 3, 4)); + queue.poll(); + assertThat(iteratorToList(), containsInAnyOrder(2, 3, 4)); + } + + @Test + public void testIteratorHasNextConcurrentModification() { + //There may be gaps in the elements returned by the iterator, + //but hasNext needs to be reliable even if the elements are consumed between hasNext() and next(). + queue.offer(0); + queue.offer(1); + Iterator iter = queue.iterator(); + assertThat(iter.hasNext(), is(true)); + queue.poll(); + queue.poll(); + assertThat(queue.isEmpty(), is(true)); + assertThat(iter.hasNext(), is(true)); + assertThat(iter.next(), is(0)); + assertThat(iter.hasNext(), is(false)); + } + + private List iteratorToList() { + List list = new ArrayList<>(); + Iterator iter = queue.iterator(); + iter.forEachRemaining(list::add); + return list; } } diff --git a/jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java b/jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java index 6314e69e..c0024048 100644 --- a/jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java +++ b/jctools-core/src/test/java/org/jctools/queues/MpscUnboundedArrayQueueSnapshotTest.java @@ -22,8 +22,12 @@ import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import org.junit.Before; import org.junit.Test; @@ -34,26 +38,71 @@ public class MpscUnboundedArrayQueueSnapshotTest { @Before public void setUp() throws Exception { - this.queue = new MpscUnboundedArrayQueue<>(CHUNK_SIZE); + this.queue = new MpscUnboundedArrayQueue<>(CHUNK_SIZE + 1); //Account for extra slot for JUMP } @Test - public void testSnapshot() { + public void testIterator() { queue.offer(0); - assertThat(queue.unorderedSnapshot(), contains(0)); + assertThat(iteratorToList(), contains(0)); for (int i = 1; i < CHUNK_SIZE; i++) { queue.offer(i); } - assertThat(queue.unorderedSnapshot(), containsInAnyOrder(0, 1, 2, 3)); + assertThat(iteratorToList(), containsInAnyOrder(0, 1, 2, 3)); queue.offer(4); queue.offer(5); - assertThat(queue.unorderedSnapshot(), containsInAnyOrder(0, 1, 2, 3, 4, 5)); + assertThat(iteratorToList(), containsInAnyOrder(0, 1, 2, 3, 4, 5)); queue.poll(); - assertThat(queue.unorderedSnapshot(), containsInAnyOrder(1, 2, 3, 4, 5)); + assertThat(iteratorToList(), containsInAnyOrder(1, 2, 3, 4, 5)); for (int i = 1; i < CHUNK_SIZE; i++) { queue.poll(); } - assertThat(queue.unorderedSnapshot(), containsInAnyOrder(4, 5)); + assertThat(iteratorToList(), containsInAnyOrder(4, 5)); + } + + @Test + public void testIteratorOutpacedByConsumer() { + int slotsToForceMultipleBuffers = CHUNK_SIZE + 1; + for (int i = 0; i < slotsToForceMultipleBuffers; i++) { + queue.offer(i); + } + Iterator iter = queue.iterator(); + List entries = new ArrayList<>(); + entries.add(iter.next()); + for (int i = 0; i < CHUNK_SIZE; i++) { + queue.poll(); + } + //Now that the consumer has discarded the first buffer, the iterator needs to follow it to the new buffer. + iter.forEachRemaining(entries::add); + assertThat(entries, containsInAnyOrder(0, 1, 4)); + } + + @Test + public void testIteratorHasNextConcurrentModification() { + /* + * There may be gaps in the elements returned by the iterator, but hasNext needs to be reliable even if the elements are consumed + * between hasNext() and next(), and even if the consumer buffer changes. + */ + int slotsToForceMultipleBuffers = CHUNK_SIZE + 1; + for (int i = 0; i < slotsToForceMultipleBuffers; i++) { + queue.offer(i); + } + Iterator iter = queue.iterator(); + assertThat(iter.hasNext(), is(true)); + for (int i = 0; i < slotsToForceMultipleBuffers; i++) { + queue.poll(); + } + assertThat(queue.isEmpty(), is(true)); + assertThat(iter.hasNext(), is(true)); + assertThat(iter.next(), is(0)); + assertThat(iter.hasNext(), is(false)); + } + + private List iteratorToList() { + List list = new ArrayList<>(); + Iterator iter = queue.iterator(); + iter.forEachRemaining(list::add); + return list; } } From d2538cf69320b63ffae870686e4af7100304df44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Fri, 18 Jan 2019 19:56:25 +0100 Subject: [PATCH 3/3] Update Javadoc, set new buffer reference before updating BUFFER_CONSUMED to ensure iterator doesn't jump to old buffer --- .../queues/BaseMpscLinkedArrayQueue.java | 29 ++-- .../org/jctools/queues/MpscArrayQueue.java | 6 +- .../BaseMpscLinkedAtomicArrayQueue.java | 133 +++++++++++------- .../queues/atomic/MpscAtomicArrayQueue.java | 67 +++++++-- .../java/org/jctools/queues/package-info.java | 3 +- 5 files changed, 155 insertions(+), 83 deletions(-) diff --git a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java index de7cd68f..16ca3ff4 100644 --- a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java @@ -30,8 +30,6 @@ import static org.jctools.util.UnsafeRefArrayAccess.lvElement; import static org.jctools.util.UnsafeRefArrayAccess.soElement; -import java.util.ArrayList; -import java.util.List; abstract class BaseMpscLinkedArrayQueuePad1 extends AbstractQueue implements IndexedQueue { @@ -311,7 +309,7 @@ public E poll() if (e == JUMP) { - final E[] nextBuffer = getNextBuffer(buffer, mask); + final E[] nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } @@ -347,7 +345,7 @@ public E peek() } if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -398,10 +396,12 @@ else if (casProducerIndex(pIndex, pIndex + 1)) protected abstract long availableInQueue(long pIndex, long cIndex); @SuppressWarnings("unchecked") - private E[] getNextBuffer(final E[] buffer, final long mask) + private E[] nextBuffer(final E[] buffer, final long mask) { final long offset = nextArrayOffset(mask); final E[] nextBuffer = (E[]) lvElement(buffer, offset); + consumerBuffer = nextBuffer; + consumerMask = (length(nextBuffer) - 2) << 1; soElement(buffer, offset, BUFFER_CONSUMED); return nextBuffer; } @@ -413,7 +413,7 @@ private long nextArrayOffset(long mask) private E newBufferPoll(E[] nextBuffer, long index) { - final long offset = newBufferAndOffset(nextBuffer, index); + final long offset = modifiedCalcElementOffset(index, consumerMask); final E n = lvElement(nextBuffer, offset);// LoadLoad if (n == null) { @@ -426,7 +426,7 @@ private E newBufferPoll(E[] nextBuffer, long index) private E newBufferPeek(E[] nextBuffer, long index) { - final long offset = newBufferAndOffset(nextBuffer, index); + final long offset = modifiedCalcElementOffset(index, consumerMask); final E n = lvElement(nextBuffer, offset);// LoadLoad if (null == n) { @@ -435,13 +435,6 @@ private E newBufferPeek(E[] nextBuffer, long index) return n; } - private long newBufferAndOffset(E[] nextBuffer, long index) - { - consumerBuffer = nextBuffer; - consumerMask = (length(nextBuffer) - 2) << 1; - return modifiedCalcElementOffset(index, consumerMask); - } - @Override public long currentProducerIndex() { @@ -479,7 +472,7 @@ public E relaxedPoll() } if (e == JUMP) { - final E[] nextBuffer = getNextBuffer(buffer, mask); + final E[] nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } soElement(buffer, offset, null); @@ -499,7 +492,7 @@ public E relaxedPeek() Object e = lvElement(buffer, offset);// LoadLoad if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -642,10 +635,12 @@ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) /** * Get an iterator for this queue. This method is thread safe. - * + *

* The iterator provides a best-effort snapshot of the elements in the queue. * The returned iterator is not guaranteed to return elements in queue order, * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * * @return The iterator. */ @Override diff --git a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java index 5e806fcd..cfecadd5 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java @@ -19,9 +19,7 @@ import static org.jctools.util.UnsafeAccess.fieldOffset; import static org.jctools.util.UnsafeRefArrayAccess.*; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; abstract class MpscArrayQueueL1Pad extends ConcurrentCircularArrayQueue { @@ -569,10 +567,12 @@ public void fill(Supplier s, WaitStrategy w, ExitCondition exit) /** * Get an iterator for this queue. This method is thread safe. - * + *

* The iterator provides a best-effort snapshot of the elements in the queue. * The returned iterator is not guaranteed to return elements in queue order, * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * * @return The iterator. */ @Override diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java index 1baf79c5..5d2025ca 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java @@ -21,8 +21,6 @@ import java.util.Iterator; import static org.jctools.queues.atomic.LinkedAtomicArrayQueueUtil.length; import static org.jctools.queues.atomic.LinkedAtomicArrayQueueUtil.modifiedCalcElementOffset; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.jctools.queues.MessagePassingQueue; @@ -94,21 +92,11 @@ abstract class BaseMpscLinkedAtomicArrayQueueConsumerFields extends BaseMpscL protected AtomicReferenceArray consumerBuffer; - private volatile AtomicReferenceArray volatileConsumerBuffer; - @Override public final long lvConsumerIndex() { return consumerIndex; } - final AtomicReferenceArray lvVolatileConsumerBuffer() { - return volatileConsumerBuffer; - } - - final void svVolatileConsumerBuffer(AtomicReferenceArray newValue) { - volatileConsumerBuffer = newValue; - } - final long lpConsumerIndex() { return consumerIndex; } @@ -171,6 +159,8 @@ public abstract class BaseMpscLinkedAtomicArrayQueue extends BaseMpscLinkedAt // No post padding here, subclasses must add private static final Object JUMP = new Object(); + private static final Object BUFFER_CONSUMED = new Object(); + private static final int CONTINUE_TO_P_INDEX_CAS = 0; private static final int RETRY = 1; @@ -193,17 +183,11 @@ public BaseMpscLinkedAtomicArrayQueue(final int initialCapacity) { producerBuffer = buffer; producerMask = mask; consumerBuffer = buffer; - svVolatileConsumerBuffer(buffer); consumerMask = mask; // we know it's all empty to start with soProducerLimit(mask); } - @Override - public final Iterator iterator() { - throw new UnsupportedOperationException(); - } - @Override public final int size() { // NOTE: because indices are on even numbers we cannot use the size util. @@ -313,7 +297,7 @@ public E poll() { } } if (e == JUMP) { - final AtomicReferenceArray nextBuffer = getNextBuffer(buffer, mask); + final AtomicReferenceArray nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } // release element null @@ -344,7 +328,7 @@ public E peek() { } while (e == null); } if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -383,9 +367,12 @@ private int offerSlowPath(long mask, long pIndex, long producerLimit) { protected abstract long availableInQueue(long pIndex, long cIndex); @SuppressWarnings("unchecked") - private AtomicReferenceArray getNextBuffer(final AtomicReferenceArray buffer, final long mask) { + private AtomicReferenceArray nextBuffer(final AtomicReferenceArray buffer, final long mask) { final int offset = nextArrayOffset(mask); final AtomicReferenceArray nextBuffer = (AtomicReferenceArray) lvElement(buffer, offset); + consumerBuffer = nextBuffer; + consumerMask = (length(nextBuffer) - 2) << 1; + soElement(buffer, offset, BUFFER_CONSUMED); return nextBuffer; } @@ -394,7 +381,7 @@ private int nextArrayOffset(long mask) { } private E newBufferPoll(AtomicReferenceArray nextBuffer, long index) { - final int offset = newBufferAndOffset(nextBuffer, index); + final int offset = modifiedCalcElementOffset(index, consumerMask); // LoadLoad final E n = lvElement(nextBuffer, offset); if (n == null) { @@ -407,7 +394,7 @@ private E newBufferPoll(AtomicReferenceArray nextBuffer, long index) { } private E newBufferPeek(AtomicReferenceArray nextBuffer, long index) { - final int offset = newBufferAndOffset(nextBuffer, index); + final int offset = modifiedCalcElementOffset(index, consumerMask); // LoadLoad final E n = lvElement(nextBuffer, offset); if (null == n) { @@ -416,13 +403,6 @@ private E newBufferPeek(AtomicReferenceArray nextBuffer, long index) { return n; } - private int newBufferAndOffset(AtomicReferenceArray nextBuffer, long index) { - consumerBuffer = nextBuffer; - svVolatileConsumerBuffer(nextBuffer); - consumerMask = (length(nextBuffer) - 2) << 1; - return modifiedCalcElementOffset(index, consumerMask); - } - @Override public long currentProducerIndex() { return lvProducerIndex() / 2; @@ -454,7 +434,7 @@ public E relaxedPoll() { return null; } if (e == JUMP) { - final AtomicReferenceArray nextBuffer = getNextBuffer(buffer, mask); + final AtomicReferenceArray nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } soElement(buffer, offset, null); @@ -472,7 +452,7 @@ public E relaxedPeek() { // LoadLoad Object e = lvElement(buffer, offset); if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -584,28 +564,79 @@ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) { } } - public List unorderedSnapshot() { - AtomicReferenceArray currentBuffer = lvVolatileConsumerBuffer(); - List elements = new ArrayList(); - while (true) { - int length = length(currentBuffer); - for (int i = 0; i < length - 1; i++) { - int offset = calcElementOffset(i); - Object element = lvElement(currentBuffer, offset); - if (element == JUMP || element == null) { - continue; + /** + * Get an iterator for this queue. This method is thread safe. + *

+ * The iterator provides a best-effort snapshot of the elements in the queue. + * The returned iterator is not guaranteed to return elements in queue order, + * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * + * @return The iterator. + */ + @Override + public Iterator iterator() { + return new WeakIterator(); + } + + /** + * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator + * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java. + */ + private final class WeakIterator implements Iterator { + + private long nextIndex; + + private E nextElement; + + private AtomicReferenceArray currentBuffer; + + private int currentBufferLength; + + WeakIterator() { + setBuffer(consumerBuffer); + nextElement = getNext(); + } + + @Override + public boolean hasNext() { + return nextElement != null; + } + + @Override + public E next() { + E e = nextElement; + nextElement = getNext(); + return e; + } + + private void setBuffer(AtomicReferenceArray buffer) { + this.currentBuffer = buffer; + this.currentBufferLength = length(buffer); + this.nextIndex = 0; + } + + private E getNext() { + while (true) { + while (nextIndex < currentBufferLength - 1) { + int offset = calcElementOffset(nextIndex++); + E e = lvElement(currentBuffer, offset); + if (e != null && e != JUMP) { + return e; + } + } + int offset = calcElementOffset(currentBufferLength - 1); + Object nextArray = lvElement(currentBuffer, offset); + if (nextArray == BUFFER_CONSUMED) { + //Consumer may have passed us, just jump to the current consumer buffer + setBuffer(consumerBuffer); + } else if (nextArray != null) { + setBuffer((AtomicReferenceArray) nextArray); + } else { + return null; } - elements.add((E) element); - } - int offset = calcElementOffset((length - 1)); - Object nextArray = lvElement(currentBuffer, offset); - if (nextArray != null) { - currentBuffer = (AtomicReferenceArray) nextArray; - } else { - break; } } - return elements; } private void resize(long oldMask, AtomicReferenceArray oldBuffer, long pIndex, E e) { diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java index b4281c1c..3bdc77f4 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java @@ -14,8 +14,7 @@ package org.jctools.queues.atomic; import org.jctools.util.PortableJvmInfo; -import java.util.ArrayList; -import java.util.List; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicLongArray; @@ -535,17 +534,63 @@ public void fill(Supplier s, WaitStrategy w, ExitCondition exit) { } } - public List unorderedSnapshot() { - int length = capacity(); - List elements = new ArrayList(); - for (int i = 0; i < length; i++) { - int offset = calcElementOffset(i); - E element = lvElement(buffer, offset); - if (element != null) { - elements.add(element); + /** + * Get an iterator for this queue. This method is thread safe. + *

+ * The iterator provides a best-effort snapshot of the elements in the queue. + * The returned iterator is not guaranteed to return elements in queue order, + * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * + * @return The iterator. + */ + @Override + public final Iterator iterator() { + final long cIndex = lvConsumerIndex(); + final long pIndex = lvProducerIndex(); + return new WeakIterator(cIndex, pIndex); + } + + /** + * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator + * which can found in the jctools-build module. The original source file is MpscArrayQueue.java. + */ + private final class WeakIterator implements Iterator { + + private final long pIndex; + + private long nextIndex; + + private E nextElement; + + WeakIterator(long cIndex, long pIndex) { + this.nextIndex = cIndex; + this.pIndex = pIndex; + nextElement = getNext(); + } + + @Override + public boolean hasNext() { + return nextElement != null; + } + + @Override + public E next() { + E e = nextElement; + nextElement = getNext(); + return e; + } + + private E getNext() { + while (nextIndex < pIndex) { + int offset = calcElementOffset(nextIndex++); + E e = lvElement(buffer, offset); + if (e != null) { + return e; + } } + return null; } - return elements; } /** diff --git a/jctools-core/src/main/java/org/jctools/queues/package-info.java b/jctools-core/src/main/java/org/jctools/queues/package-info.java index ac892774..6697859f 100644 --- a/jctools-core/src/main/java/org/jctools/queues/package-info.java +++ b/jctools-core/src/main/java/org/jctools/queues/package-info.java @@ -29,7 +29,7 @@ *
* Limited Queue methods support:
* The queues implement a subset of the {@link java.util.Queue} interface which is documented under the - * {@link org.jctools.queues.MessagePassingQueue} interface. In particular {@link java.util.Queue#iterator()} is not + * {@link org.jctools.queues.MessagePassingQueue} interface. In particular {@link java.util.Queue#iterator()} is usually not * supported and dependent methods from {@link java.util.AbstractQueue} are also not supported such as: *

    *
  1. {@link java.util.Queue#remove(Object)} @@ -38,6 +38,7 @@ *
  2. {@link java.util.Queue#contains(Object)} *
  3. {@link java.util.Queue#containsAll(java.util.Collection)} *
+ * A few queues do support a limited form of iteration. This support is documented in the Javadoc of the relevant queues. *

*
* Memory layout controls and False Sharing: