Skip to content

Commit

Permalink
Update Javadoc, set new buffer reference before updating BUFFER_CONSU…
Browse files Browse the repository at this point in the history
…MED to ensure iterator doesn't jump to old buffer
  • Loading branch information
srdo committed Jan 18, 2019
1 parent 48adfd1 commit d2538cf
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 83 deletions.
Expand Up @@ -30,8 +30,6 @@
import static org.jctools.util.UnsafeRefArrayAccess.lvElement;
import static org.jctools.util.UnsafeRefArrayAccess.soElement;

import java.util.ArrayList;
import java.util.List;

abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue
{
Expand Down Expand Up @@ -311,7 +309,7 @@ public E poll()

if (e == JUMP)
{
final E[] nextBuffer = getNextBuffer(buffer, mask);
final E[] nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}

Expand Down Expand Up @@ -347,7 +345,7 @@ public E peek()
}
if (e == JUMP)
{
return newBufferPeek(getNextBuffer(buffer, mask), index);
return newBufferPeek(nextBuffer(buffer, mask), index);
}
return (E) e;
}
Expand Down Expand Up @@ -398,10 +396,12 @@ else if (casProducerIndex(pIndex, pIndex + 1))
protected abstract long availableInQueue(long pIndex, long cIndex);

@SuppressWarnings("unchecked")
private E[] getNextBuffer(final E[] buffer, final long mask)
private E[] nextBuffer(final E[] buffer, final long mask)
{
final long offset = nextArrayOffset(mask);
final E[] nextBuffer = (E[]) lvElement(buffer, offset);
consumerBuffer = nextBuffer;
consumerMask = (length(nextBuffer) - 2) << 1;
soElement(buffer, offset, BUFFER_CONSUMED);
return nextBuffer;
}
Expand All @@ -413,7 +413,7 @@ private long nextArrayOffset(long mask)

private E newBufferPoll(E[] nextBuffer, long index)
{
final long offset = newBufferAndOffset(nextBuffer, index);
final long offset = modifiedCalcElementOffset(index, consumerMask);
final E n = lvElement(nextBuffer, offset);// LoadLoad
if (n == null)
{
Expand All @@ -426,7 +426,7 @@ private E newBufferPoll(E[] nextBuffer, long index)

private E newBufferPeek(E[] nextBuffer, long index)
{
final long offset = newBufferAndOffset(nextBuffer, index);
final long offset = modifiedCalcElementOffset(index, consumerMask);
final E n = lvElement(nextBuffer, offset);// LoadLoad
if (null == n)
{
Expand All @@ -435,13 +435,6 @@ private E newBufferPeek(E[] nextBuffer, long index)
return n;
}

private long newBufferAndOffset(E[] nextBuffer, long index)
{
consumerBuffer = nextBuffer;
consumerMask = (length(nextBuffer) - 2) << 1;
return modifiedCalcElementOffset(index, consumerMask);
}

@Override
public long currentProducerIndex()
{
Expand Down Expand Up @@ -479,7 +472,7 @@ public E relaxedPoll()
}
if (e == JUMP)
{
final E[] nextBuffer = getNextBuffer(buffer, mask);
final E[] nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}
soElement(buffer, offset, null);
Expand All @@ -499,7 +492,7 @@ public E relaxedPeek()
Object e = lvElement(buffer, offset);// LoadLoad
if (e == JUMP)
{
return newBufferPeek(getNextBuffer(buffer, mask), index);
return newBufferPeek(nextBuffer(buffer, mask), index);
}
return (E) e;
}
Expand Down Expand Up @@ -642,10 +635,12 @@ public void drain(Consumer<E> c, WaitStrategy w, ExitCondition exit)

/**
* Get an iterator for this queue. This method is thread safe.
*
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
Expand Down
Expand Up @@ -19,9 +19,7 @@
import static org.jctools.util.UnsafeAccess.fieldOffset;
import static org.jctools.util.UnsafeRefArrayAccess.*;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

abstract class MpscArrayQueueL1Pad<E> extends ConcurrentCircularArrayQueue<E>
{
Expand Down Expand Up @@ -569,10 +567,12 @@ public void fill(Supplier<E> s, WaitStrategy w, ExitCondition exit)

/**
* Get an iterator for this queue. This method is thread safe.
*
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
Expand Down
Expand Up @@ -21,8 +21,6 @@
import java.util.Iterator;
import static org.jctools.queues.atomic.LinkedAtomicArrayQueueUtil.length;
import static org.jctools.queues.atomic.LinkedAtomicArrayQueueUtil.modifiedCalcElementOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.jctools.queues.MessagePassingQueue;
Expand Down Expand Up @@ -94,21 +92,11 @@ abstract class BaseMpscLinkedAtomicArrayQueueConsumerFields<E> extends BaseMpscL

protected AtomicReferenceArray<E> consumerBuffer;

private volatile AtomicReferenceArray<E> volatileConsumerBuffer;

@Override
public final long lvConsumerIndex() {
return consumerIndex;
}

final AtomicReferenceArray<E> lvVolatileConsumerBuffer() {
return volatileConsumerBuffer;
}

final void svVolatileConsumerBuffer(AtomicReferenceArray<E> newValue) {
volatileConsumerBuffer = newValue;
}

final long lpConsumerIndex() {
return consumerIndex;
}
Expand Down Expand Up @@ -171,6 +159,8 @@ public abstract class BaseMpscLinkedAtomicArrayQueue<E> extends BaseMpscLinkedAt
// No post padding here, subclasses must add
private static final Object JUMP = new Object();

private static final Object BUFFER_CONSUMED = new Object();

private static final int CONTINUE_TO_P_INDEX_CAS = 0;

private static final int RETRY = 1;
Expand All @@ -193,17 +183,11 @@ public BaseMpscLinkedAtomicArrayQueue(final int initialCapacity) {
producerBuffer = buffer;
producerMask = mask;
consumerBuffer = buffer;
svVolatileConsumerBuffer(buffer);
consumerMask = mask;
// we know it's all empty to start with
soProducerLimit(mask);
}

@Override
public final Iterator<E> iterator() {
throw new UnsupportedOperationException();
}

@Override
public final int size() {
// NOTE: because indices are on even numbers we cannot use the size util.
Expand Down Expand Up @@ -313,7 +297,7 @@ public E poll() {
}
}
if (e == JUMP) {
final AtomicReferenceArray<E> nextBuffer = getNextBuffer(buffer, mask);
final AtomicReferenceArray<E> nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}
// release element null
Expand Down Expand Up @@ -344,7 +328,7 @@ public E peek() {
} while (e == null);
}
if (e == JUMP) {
return newBufferPeek(getNextBuffer(buffer, mask), index);
return newBufferPeek(nextBuffer(buffer, mask), index);
}
return (E) e;
}
Expand Down Expand Up @@ -383,9 +367,12 @@ private int offerSlowPath(long mask, long pIndex, long producerLimit) {
protected abstract long availableInQueue(long pIndex, long cIndex);

@SuppressWarnings("unchecked")
private AtomicReferenceArray<E> getNextBuffer(final AtomicReferenceArray<E> buffer, final long mask) {
private AtomicReferenceArray<E> nextBuffer(final AtomicReferenceArray<E> buffer, final long mask) {
final int offset = nextArrayOffset(mask);
final AtomicReferenceArray<E> nextBuffer = (AtomicReferenceArray<E>) lvElement(buffer, offset);
consumerBuffer = nextBuffer;
consumerMask = (length(nextBuffer) - 2) << 1;
soElement(buffer, offset, BUFFER_CONSUMED);
return nextBuffer;
}

Expand All @@ -394,7 +381,7 @@ private int nextArrayOffset(long mask) {
}

private E newBufferPoll(AtomicReferenceArray<E> nextBuffer, long index) {
final int offset = newBufferAndOffset(nextBuffer, index);
final int offset = modifiedCalcElementOffset(index, consumerMask);
// LoadLoad
final E n = lvElement(nextBuffer, offset);
if (n == null) {
Expand All @@ -407,7 +394,7 @@ private E newBufferPoll(AtomicReferenceArray<E> nextBuffer, long index) {
}

private E newBufferPeek(AtomicReferenceArray<E> nextBuffer, long index) {
final int offset = newBufferAndOffset(nextBuffer, index);
final int offset = modifiedCalcElementOffset(index, consumerMask);
// LoadLoad
final E n = lvElement(nextBuffer, offset);
if (null == n) {
Expand All @@ -416,13 +403,6 @@ private E newBufferPeek(AtomicReferenceArray<E> nextBuffer, long index) {
return n;
}

private int newBufferAndOffset(AtomicReferenceArray<E> nextBuffer, long index) {
consumerBuffer = nextBuffer;
svVolatileConsumerBuffer(nextBuffer);
consumerMask = (length(nextBuffer) - 2) << 1;
return modifiedCalcElementOffset(index, consumerMask);
}

@Override
public long currentProducerIndex() {
return lvProducerIndex() / 2;
Expand Down Expand Up @@ -454,7 +434,7 @@ public E relaxedPoll() {
return null;
}
if (e == JUMP) {
final AtomicReferenceArray<E> nextBuffer = getNextBuffer(buffer, mask);
final AtomicReferenceArray<E> nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}
soElement(buffer, offset, null);
Expand All @@ -472,7 +452,7 @@ public E relaxedPeek() {
// LoadLoad
Object e = lvElement(buffer, offset);
if (e == JUMP) {
return newBufferPeek(getNextBuffer(buffer, mask), index);
return newBufferPeek(nextBuffer(buffer, mask), index);
}
return (E) e;
}
Expand Down Expand Up @@ -584,28 +564,79 @@ public void drain(Consumer<E> c, WaitStrategy w, ExitCondition exit) {
}
}

public List<E> unorderedSnapshot() {
AtomicReferenceArray<E> currentBuffer = lvVolatileConsumerBuffer();
List<E> elements = new ArrayList<E>();
while (true) {
int length = length(currentBuffer);
for (int i = 0; i < length - 1; i++) {
int offset = calcElementOffset(i);
Object element = lvElement(currentBuffer, offset);
if (element == JUMP || element == null) {
continue;
/**
* Get an iterator for this queue. This method is thread safe.
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
public Iterator<E> iterator() {
return new WeakIterator();
}

/**
* NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicLinkedQueueGenerator
* which can found in the jctools-build module. The original source file is BaseMpscLinkedArrayQueue.java.
*/
private final class WeakIterator implements Iterator<E> {

private long nextIndex;

private E nextElement;

private AtomicReferenceArray<E> currentBuffer;

private int currentBufferLength;

WeakIterator() {
setBuffer(consumerBuffer);
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private void setBuffer(AtomicReferenceArray<E> buffer) {
this.currentBuffer = buffer;
this.currentBufferLength = length(buffer);
this.nextIndex = 0;
}

private E getNext() {
while (true) {
while (nextIndex < currentBufferLength - 1) {
int offset = calcElementOffset(nextIndex++);
E e = lvElement(currentBuffer, offset);
if (e != null && e != JUMP) {
return e;
}
}
int offset = calcElementOffset(currentBufferLength - 1);
Object nextArray = lvElement(currentBuffer, offset);
if (nextArray == BUFFER_CONSUMED) {
//Consumer may have passed us, just jump to the current consumer buffer
setBuffer(consumerBuffer);
} else if (nextArray != null) {
setBuffer((AtomicReferenceArray<E>) nextArray);
} else {
return null;
}
elements.add((E) element);
}
int offset = calcElementOffset((length - 1));
Object nextArray = lvElement(currentBuffer, offset);
if (nextArray != null) {
currentBuffer = (AtomicReferenceArray<E>) nextArray;
} else {
break;
}
}
return elements;
}

private void resize(long oldMask, AtomicReferenceArray<E> oldBuffer, long pIndex, E e) {
Expand Down

0 comments on commit d2538cf

Please sign in to comment.