Skip to content

Commit

Permalink
Merge pull request #229 from srdo/issue-228
Browse files Browse the repository at this point in the history
Issue-228: Add unorderedSnapshot method to MpscArrayQueue and BaseMps…
  • Loading branch information
nitsanw committed Apr 6, 2019
2 parents 959f3ed + d2538cf commit 58a08a1
Show file tree
Hide file tree
Showing 8 changed files with 478 additions and 41 deletions.
Expand Up @@ -26,9 +26,11 @@
import static org.jctools.queues.LinkedArrayQueueUtil.modifiedCalcElementOffset;
import static org.jctools.util.UnsafeAccess.UNSAFE;
import static org.jctools.util.UnsafeAccess.fieldOffset;
import static org.jctools.util.UnsafeRefArrayAccess.calcElementOffset;
import static org.jctools.util.UnsafeRefArrayAccess.lvElement;
import static org.jctools.util.UnsafeRefArrayAccess.soElement;


abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue
{
long p01, p02, p03, p04, p05, p06, p07;
Expand Down Expand Up @@ -135,6 +137,7 @@ public abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQue
{
// No post padding here, subclasses must add
private static final Object JUMP = new Object();
private static final Object BUFFER_CONSUMED = new Object();
private static final int CONTINUE_TO_P_INDEX_CAS = 0;
private static final int RETRY = 1;
private static final int QUEUE_FULL = 2;
Expand All @@ -161,12 +164,6 @@ public BaseMpscLinkedArrayQueue(final int initialCapacity)
soProducerLimit(mask); // we know it's all empty to start with
}

@Override
public final Iterator<E> iterator()
{
throw new UnsupportedOperationException();
}

@Override
public final int size()
{
Expand Down Expand Up @@ -312,7 +309,7 @@ public E poll()

if (e == JUMP)
{
final E[] nextBuffer = getNextBuffer(buffer, mask);
final E[] nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}

Expand Down Expand Up @@ -348,7 +345,7 @@ public E peek()
}
if (e == JUMP)
{
return newBufferPeek(getNextBuffer(buffer, mask), index);
return newBufferPeek(nextBuffer(buffer, mask), index);
}
return (E) e;
}
Expand Down Expand Up @@ -399,11 +396,13 @@ else if (casProducerIndex(pIndex, pIndex + 1))
protected abstract long availableInQueue(long pIndex, long cIndex);

@SuppressWarnings("unchecked")
private E[] getNextBuffer(final E[] buffer, final long mask)
private E[] nextBuffer(final E[] buffer, final long mask)
{
final long offset = nextArrayOffset(mask);
final E[] nextBuffer = (E[]) lvElement(buffer, offset);
soElement(buffer, offset, null);
consumerBuffer = nextBuffer;
consumerMask = (length(nextBuffer) - 2) << 1;
soElement(buffer, offset, BUFFER_CONSUMED);
return nextBuffer;
}

Expand All @@ -414,7 +413,7 @@ private long nextArrayOffset(long mask)

private E newBufferPoll(E[] nextBuffer, long index)
{
final long offset = newBufferAndOffset(nextBuffer, index);
final long offset = modifiedCalcElementOffset(index, consumerMask);
final E n = lvElement(nextBuffer, offset);// LoadLoad
if (n == null)
{
Expand All @@ -427,7 +426,7 @@ private E newBufferPoll(E[] nextBuffer, long index)

private E newBufferPeek(E[] nextBuffer, long index)
{
final long offset = newBufferAndOffset(nextBuffer, index);
final long offset = modifiedCalcElementOffset(index, consumerMask);
final E n = lvElement(nextBuffer, offset);// LoadLoad
if (null == n)
{
Expand All @@ -436,13 +435,6 @@ private E newBufferPeek(E[] nextBuffer, long index)
return n;
}

private long newBufferAndOffset(E[] nextBuffer, long index)
{
consumerBuffer = nextBuffer;
consumerMask = (length(nextBuffer) - 2) << 1;
return modifiedCalcElementOffset(index, consumerMask);
}

@Override
public long currentProducerIndex()
{
Expand Down Expand Up @@ -480,7 +472,7 @@ public E relaxedPoll()
}
if (e == JUMP)
{
final E[] nextBuffer = getNextBuffer(buffer, mask);
final E[] nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}
soElement(buffer, offset, null);
Expand All @@ -500,7 +492,7 @@ public E relaxedPeek()
Object e = lvElement(buffer, offset);// LoadLoad
if (e == JUMP)
{
return newBufferPeek(getNextBuffer(buffer, mask), index);
return newBufferPeek(nextBuffer(buffer, mask), index);
}
return (E) e;
}
Expand Down Expand Up @@ -640,6 +632,74 @@ public void drain(Consumer<E> c, WaitStrategy w, ExitCondition exit)
c.accept(e);
}
}

/**
* Get an iterator for this queue. This method is thread safe.
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
public Iterator<E> iterator() {
return new WeakIterator();
}

private final class WeakIterator implements Iterator<E> {

private long nextIndex;
private E nextElement;
private E[] currentBuffer;
private int currentBufferLength;

WeakIterator() {
setBuffer(consumerBuffer);
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private void setBuffer(E[] buffer) {
this.currentBuffer = buffer;
this.currentBufferLength = length(buffer);
this.nextIndex = 0;
}

private E getNext() {
while (true) {
while (nextIndex < currentBufferLength - 1) {
long offset = calcElementOffset(nextIndex++);
E e = lvElement(currentBuffer, offset);
if (e != null && e != JUMP) {
return e;
}
}
long offset = calcElementOffset(currentBufferLength - 1);
Object nextArray = lvElement(currentBuffer, offset);
if (nextArray == BUFFER_CONSUMED) {
//Consumer may have passed us, just jump to the current consumer buffer
setBuffer(consumerBuffer);
} else if (nextArray != null) {
setBuffer((E[]) nextArray);
} else {
return null;
}
}
}
}

private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s)
{
Expand Down
56 changes: 56 additions & 0 deletions jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java
Expand Up @@ -19,6 +19,8 @@
import static org.jctools.util.UnsafeAccess.fieldOffset;
import static org.jctools.util.UnsafeRefArrayAccess.*;

import java.util.Iterator;

abstract class MpscArrayQueueL1Pad<E> extends ConcurrentCircularArrayQueue<E>
{
long p00, p01, p02, p03, p04, p05, p06, p07;
Expand Down Expand Up @@ -562,4 +564,58 @@ public void fill(Supplier<E> s, WaitStrategy w, ExitCondition exit)
idleCounter = 0;
}
}

/**
* Get an iterator for this queue. This method is thread safe.
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
public final Iterator<E> iterator() {
final long cIndex = lvConsumerIndex();
final long pIndex = lvProducerIndex();

return new WeakIterator(cIndex, pIndex);
}

private final class WeakIterator implements Iterator<E> {

private final long pIndex;
private long nextIndex;
private E nextElement;

WeakIterator(long cIndex, long pIndex) {
this.nextIndex = cIndex;
this.pIndex = pIndex;
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private E getNext() {
while (nextIndex < pIndex) {
long offset = calcElementOffset(nextIndex++);
E e = lvElement(buffer, offset);
if (e != null) {
return e;
}
}
return null;
}
}
}

0 comments on commit 58a08a1

Please sign in to comment.