From d2538cf69320b63ffae870686e4af7100304df44 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Fri, 18 Jan 2019 19:56:25 +0100 Subject: [PATCH] Update Javadoc, set new buffer reference before updating BUFFER_CONSUMED to ensure iterator doesn't jump to old buffer --- .../queues/BaseMpscLinkedArrayQueue.java | 29 ++-- .../org/jctools/queues/MpscArrayQueue.java | 6 +- .../BaseMpscLinkedAtomicArrayQueue.java | 133 +++++++++++------- .../queues/atomic/MpscAtomicArrayQueue.java | 67 +++++++-- .../java/org/jctools/queues/package-info.java | 3 +- 5 files changed, 155 insertions(+), 83 deletions(-) diff --git a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java index de7cd68f..16ca3ff4 100644 --- a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java @@ -30,8 +30,6 @@ import static org.jctools.util.UnsafeRefArrayAccess.lvElement; import static org.jctools.util.UnsafeRefArrayAccess.soElement; -import java.util.ArrayList; -import java.util.List; abstract class BaseMpscLinkedArrayQueuePad1 extends AbstractQueue implements IndexedQueue { @@ -311,7 +309,7 @@ public E poll() if (e == JUMP) { - final E[] nextBuffer = getNextBuffer(buffer, mask); + final E[] nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } @@ -347,7 +345,7 @@ public E peek() } if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -398,10 +396,12 @@ else if (casProducerIndex(pIndex, pIndex + 1)) protected abstract long availableInQueue(long pIndex, long cIndex); @SuppressWarnings("unchecked") - private E[] getNextBuffer(final E[] buffer, final long mask) + private E[] nextBuffer(final E[] buffer, final long mask) { final long offset = nextArrayOffset(mask); final E[] nextBuffer = (E[]) lvElement(buffer, offset); + consumerBuffer = nextBuffer; + consumerMask = (length(nextBuffer) - 2) << 1; soElement(buffer, offset, BUFFER_CONSUMED); return nextBuffer; } @@ -413,7 +413,7 @@ private long nextArrayOffset(long mask) private E newBufferPoll(E[] nextBuffer, long index) { - final long offset = newBufferAndOffset(nextBuffer, index); + final long offset = modifiedCalcElementOffset(index, consumerMask); final E n = lvElement(nextBuffer, offset);// LoadLoad if (n == null) { @@ -426,7 +426,7 @@ private E newBufferPoll(E[] nextBuffer, long index) private E newBufferPeek(E[] nextBuffer, long index) { - final long offset = newBufferAndOffset(nextBuffer, index); + final long offset = modifiedCalcElementOffset(index, consumerMask); final E n = lvElement(nextBuffer, offset);// LoadLoad if (null == n) { @@ -435,13 +435,6 @@ private E newBufferPeek(E[] nextBuffer, long index) return n; } - private long newBufferAndOffset(E[] nextBuffer, long index) - { - consumerBuffer = nextBuffer; - consumerMask = (length(nextBuffer) - 2) << 1; - return modifiedCalcElementOffset(index, consumerMask); - } - @Override public long currentProducerIndex() { @@ -479,7 +472,7 @@ public E relaxedPoll() } if (e == JUMP) { - final E[] nextBuffer = getNextBuffer(buffer, mask); + final E[] nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } soElement(buffer, offset, null); @@ -499,7 +492,7 @@ public E relaxedPeek() Object e = lvElement(buffer, offset);// LoadLoad if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -642,10 +635,12 @@ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) /** * Get an iterator for this queue. This method is thread safe. - * + *

* The iterator provides a best-effort snapshot of the elements in the queue. * The returned iterator is not guaranteed to return elements in queue order, * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * * @return The iterator. */ @Override diff --git a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java index 5e806fcd..cfecadd5 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java @@ -19,9 +19,7 @@ import static org.jctools.util.UnsafeAccess.fieldOffset; import static org.jctools.util.UnsafeRefArrayAccess.*; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; abstract class MpscArrayQueueL1Pad extends ConcurrentCircularArrayQueue { @@ -569,10 +567,12 @@ public void fill(Supplier s, WaitStrategy w, ExitCondition exit) /** * Get an iterator for this queue. This method is thread safe. - * + *

* The iterator provides a best-effort snapshot of the elements in the queue. * The returned iterator is not guaranteed to return elements in queue order, * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * * @return The iterator. */ @Override diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java index 1baf79c5..5d2025ca 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java @@ -21,8 +21,6 @@ import java.util.Iterator; import static org.jctools.queues.atomic.LinkedAtomicArrayQueueUtil.length; import static org.jctools.queues.atomic.LinkedAtomicArrayQueueUtil.modifiedCalcElementOffset; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.jctools.queues.MessagePassingQueue; @@ -94,21 +92,11 @@ abstract class BaseMpscLinkedAtomicArrayQueueConsumerFields extends BaseMpscL protected AtomicReferenceArray consumerBuffer; - private volatile AtomicReferenceArray volatileConsumerBuffer; - @Override public final long lvConsumerIndex() { return consumerIndex; } - final AtomicReferenceArray lvVolatileConsumerBuffer() { - return volatileConsumerBuffer; - } - - final void svVolatileConsumerBuffer(AtomicReferenceArray newValue) { - volatileConsumerBuffer = newValue; - } - final long lpConsumerIndex() { return consumerIndex; } @@ -171,6 +159,8 @@ public abstract class BaseMpscLinkedAtomicArrayQueue extends BaseMpscLinkedAt // No post padding here, subclasses must add private static final Object JUMP = new Object(); + private static final Object BUFFER_CONSUMED = new Object(); + private static final int CONTINUE_TO_P_INDEX_CAS = 0; private static final int RETRY = 1; @@ -193,17 +183,11 @@ public BaseMpscLinkedAtomicArrayQueue(final int initialCapacity) { producerBuffer = buffer; producerMask = mask; consumerBuffer = buffer; - svVolatileConsumerBuffer(buffer); consumerMask = mask; // we know it's all empty to start with soProducerLimit(mask); } - @Override - public final Iterator iterator() { - throw new UnsupportedOperationException(); - } - @Override public final int size() { // NOTE: because indices are on even numbers we cannot use the size util. @@ -313,7 +297,7 @@ public E poll() { } } if (e == JUMP) { - final AtomicReferenceArray nextBuffer = getNextBuffer(buffer, mask); + final AtomicReferenceArray nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } // release element null @@ -344,7 +328,7 @@ public E peek() { } while (e == null); } if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -383,9 +367,12 @@ private int offerSlowPath(long mask, long pIndex, long producerLimit) { protected abstract long availableInQueue(long pIndex, long cIndex); @SuppressWarnings("unchecked") - private AtomicReferenceArray getNextBuffer(final AtomicReferenceArray buffer, final long mask) { + private AtomicReferenceArray nextBuffer(final AtomicReferenceArray buffer, final long mask) { final int offset = nextArrayOffset(mask); final AtomicReferenceArray nextBuffer = (AtomicReferenceArray) lvElement(buffer, offset); + consumerBuffer = nextBuffer; + consumerMask = (length(nextBuffer) - 2) << 1; + soElement(buffer, offset, BUFFER_CONSUMED); return nextBuffer; } @@ -394,7 +381,7 @@ private int nextArrayOffset(long mask) { } private E newBufferPoll(AtomicReferenceArray nextBuffer, long index) { - final int offset = newBufferAndOffset(nextBuffer, index); + final int offset = modifiedCalcElementOffset(index, consumerMask); // LoadLoad final E n = lvElement(nextBuffer, offset); if (n == null) { @@ -407,7 +394,7 @@ private E newBufferPoll(AtomicReferenceArray nextBuffer, long index) { } private E newBufferPeek(AtomicReferenceArray nextBuffer, long index) { - final int offset = newBufferAndOffset(nextBuffer, index); + final int offset = modifiedCalcElementOffset(index, consumerMask); // LoadLoad final E n = lvElement(nextBuffer, offset); if (null == n) { @@ -416,13 +403,6 @@ private E newBufferPeek(AtomicReferenceArray nextBuffer, long index) { return n; } - private int newBufferAndOffset(AtomicReferenceArray nextBuffer, long index) { - consumerBuffer = nextBuffer; - svVolatileConsumerBuffer(nextBuffer); - consumerMask = (length(nextBuffer) - 2) << 1; - return modifiedCalcElementOffset(index, consumerMask); - } - @Override public long currentProducerIndex() { return lvProducerIndex() / 2; @@ -454,7 +434,7 @@ public E relaxedPoll() { return null; } if (e == JUMP) { - final AtomicReferenceArray nextBuffer = getNextBuffer(buffer, mask); + final AtomicReferenceArray nextBuffer = nextBuffer(buffer, mask); return newBufferPoll(nextBuffer, index); } soElement(buffer, offset, null); @@ -472,7 +452,7 @@ public E relaxedPeek() { // LoadLoad Object e = lvElement(buffer, offset); if (e == JUMP) { - return newBufferPeek(getNextBuffer(buffer, mask), index); + return newBufferPeek(nextBuffer(buffer, mask), index); } return (E) e; } @@ -584,28 +564,79 @@ public void drain(Consumer c, WaitStrategy w, ExitCondition exit) { } } - public List unorderedSnapshot() { - AtomicReferenceArray currentBuffer = lvVolatileConsumerBuffer(); - List elements = new ArrayList(); - while (true) { - int length = length(currentBuffer); - for (int i = 0; i < length - 1; i++) { - int offset = calcElementOffset(i); - Object element = lvElement(currentBuffer, offset); - if (element == JUMP || element == null) { - continue; + /** + * Get an iterator for this queue. This method is thread safe. + *

+ * The iterator provides a best-effort snapshot of the elements in the queue. + * The returned iterator is not guaranteed to return elements in queue order, + * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * + * @return The iterator. + */ + @Override + public Iterator iterator() { + return new WeakIterator(); + } + + /** + * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator + * which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java. + */ + private final class WeakIterator implements Iterator { + + private long nextIndex; + + private E nextElement; + + private AtomicReferenceArray currentBuffer; + + private int currentBufferLength; + + WeakIterator() { + setBuffer(consumerBuffer); + nextElement = getNext(); + } + + @Override + public boolean hasNext() { + return nextElement != null; + } + + @Override + public E next() { + E e = nextElement; + nextElement = getNext(); + return e; + } + + private void setBuffer(AtomicReferenceArray buffer) { + this.currentBuffer = buffer; + this.currentBufferLength = length(buffer); + this.nextIndex = 0; + } + + private E getNext() { + while (true) { + while (nextIndex < currentBufferLength - 1) { + int offset = calcElementOffset(nextIndex++); + E e = lvElement(currentBuffer, offset); + if (e != null && e != JUMP) { + return e; + } + } + int offset = calcElementOffset(currentBufferLength - 1); + Object nextArray = lvElement(currentBuffer, offset); + if (nextArray == BUFFER_CONSUMED) { + //Consumer may have passed us, just jump to the current consumer buffer + setBuffer(consumerBuffer); + } else if (nextArray != null) { + setBuffer((AtomicReferenceArray) nextArray); + } else { + return null; } - elements.add((E) element); - } - int offset = calcElementOffset((length - 1)); - Object nextArray = lvElement(currentBuffer, offset); - if (nextArray != null) { - currentBuffer = (AtomicReferenceArray) nextArray; - } else { - break; } } - return elements; } private void resize(long oldMask, AtomicReferenceArray oldBuffer, long pIndex, E e) { diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java index b4281c1c..3bdc77f4 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java @@ -14,8 +14,7 @@ package org.jctools.queues.atomic; import org.jctools.util.PortableJvmInfo; -import java.util.ArrayList; -import java.util.List; +import java.util.Iterator; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicLongArray; @@ -535,17 +534,63 @@ public void fill(Supplier s, WaitStrategy w, ExitCondition exit) { } } - public List unorderedSnapshot() { - int length = capacity(); - List elements = new ArrayList(); - for (int i = 0; i < length; i++) { - int offset = calcElementOffset(i); - E element = lvElement(buffer, offset); - if (element != null) { - elements.add(element); + /** + * Get an iterator for this queue. This method is thread safe. + *

+ * The iterator provides a best-effort snapshot of the elements in the queue. + * The returned iterator is not guaranteed to return elements in queue order, + * and races with the consumer thread may cause gaps in the sequence of returned elements. + * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements. + * + * @return The iterator. + */ + @Override + public final Iterator iterator() { + final long cIndex = lvConsumerIndex(); + final long pIndex = lvProducerIndex(); + return new WeakIterator(cIndex, pIndex); + } + + /** + * NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator + * which can found in the jctools-build module. The original source file is MpscArrayQueue.java. + */ + private final class WeakIterator implements Iterator { + + private final long pIndex; + + private long nextIndex; + + private E nextElement; + + WeakIterator(long cIndex, long pIndex) { + this.nextIndex = cIndex; + this.pIndex = pIndex; + nextElement = getNext(); + } + + @Override + public boolean hasNext() { + return nextElement != null; + } + + @Override + public E next() { + E e = nextElement; + nextElement = getNext(); + return e; + } + + private E getNext() { + while (nextIndex < pIndex) { + int offset = calcElementOffset(nextIndex++); + E e = lvElement(buffer, offset); + if (e != null) { + return e; + } } + return null; } - return elements; } /** diff --git a/jctools-core/src/main/java/org/jctools/queues/package-info.java b/jctools-core/src/main/java/org/jctools/queues/package-info.java index ac892774..6697859f 100644 --- a/jctools-core/src/main/java/org/jctools/queues/package-info.java +++ b/jctools-core/src/main/java/org/jctools/queues/package-info.java @@ -29,7 +29,7 @@ *
* Limited Queue methods support:
* The queues implement a subset of the {@link java.util.Queue} interface which is documented under the - * {@link org.jctools.queues.MessagePassingQueue} interface. In particular {@link java.util.Queue#iterator()} is not + * {@link org.jctools.queues.MessagePassingQueue} interface. In particular {@link java.util.Queue#iterator()} is usually not * supported and dependent methods from {@link java.util.AbstractQueue} are also not supported such as: *

    *
  1. {@link java.util.Queue#remove(Object)} @@ -38,6 +38,7 @@ *
  2. {@link java.util.Queue#contains(Object)} *
  3. {@link java.util.Queue#containsAll(java.util.Collection)} *
+ * A few queues do support a limited form of iteration. This support is documented in the Javadoc of the relevant queues. *

*
* Memory layout controls and False Sharing: