Skip to content

Commit

Permalink
Iterator for MpscArrayQueue and MpscUnboundedArrayQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
srdo committed Jan 18, 2019
1 parent a1146c2 commit 48adfd1
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 57 deletions.
Expand Up @@ -77,22 +77,13 @@ abstract class BaseMpscLinkedArrayQueueConsumerFields<E> extends BaseMpscLinkedA
private volatile long consumerIndex;
protected long consumerMask;
protected E[] consumerBuffer;
private volatile E[] volatileConsumerBuffer;

@Override
public final long lvConsumerIndex()
{
return consumerIndex;
}

final E[] lvVolatileConsumerBuffer() {
return volatileConsumerBuffer;
}

final void svVolatileConsumerBuffer(E[] newValue) {
volatileConsumerBuffer = newValue;
}

final long lpConsumerIndex()
{
return UNSAFE.getLong(this, C_INDEX_OFFSET);
Expand Down Expand Up @@ -148,6 +139,7 @@ public abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQue
{
// No post padding here, subclasses must add
private static final Object JUMP = new Object();
private static final Object BUFFER_CONSUMED = new Object();
private static final int CONTINUE_TO_P_INDEX_CAS = 0;
private static final int RETRY = 1;
private static final int QUEUE_FULL = 2;
Expand All @@ -170,17 +162,10 @@ public BaseMpscLinkedArrayQueue(final int initialCapacity)
producerBuffer = buffer;
producerMask = mask;
consumerBuffer = buffer;
svVolatileConsumerBuffer(buffer);
consumerMask = mask;
soProducerLimit(mask); // we know it's all empty to start with
}

@Override
public final Iterator<E> iterator()
{
throw new UnsupportedOperationException();
}

@Override
public final int size()
{
Expand Down Expand Up @@ -417,6 +402,7 @@ private E[] getNextBuffer(final E[] buffer, final long mask)
{
final long offset = nextArrayOffset(mask);
final E[] nextBuffer = (E[]) lvElement(buffer, offset);
soElement(buffer, offset, BUFFER_CONSUMED);
return nextBuffer;
}

Expand Down Expand Up @@ -452,7 +438,6 @@ private E newBufferPeek(E[] nextBuffer, long index)
private long newBufferAndOffset(E[] nextBuffer, long index)
{
consumerBuffer = nextBuffer;
svVolatileConsumerBuffer(nextBuffer);
consumerMask = (length(nextBuffer) - 2) << 1;
return modifiedCalcElementOffset(index, consumerMask);
}
Expand Down Expand Up @@ -655,28 +640,70 @@ public void drain(Consumer<E> c, WaitStrategy w, ExitCondition exit)
}
}

public List<E> unorderedSnapshot() {
E[] currentBuffer = lvVolatileConsumerBuffer();
List<E> elements = new ArrayList<E>();
while (true) {
int length = length(currentBuffer);
for (int i = 0; i < length - 1; i++) {
long offset = calcElementOffset(i);
Object element = lvElement(currentBuffer, offset);
if (element == JUMP || element == null) {
continue;
/**
* Get an iterator for this queue. This method is thread safe.
*
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* @return The iterator.
*/
@Override
public Iterator<E> iterator() {
return new WeakIterator();
}

private final class WeakIterator implements Iterator<E> {

private long nextIndex;
private E nextElement;
private E[] currentBuffer;
private int currentBufferLength;

WeakIterator() {
setBuffer(consumerBuffer);
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private void setBuffer(E[] buffer) {
this.currentBuffer = buffer;
this.currentBufferLength = length(buffer);
this.nextIndex = 0;
}

private E getNext() {
while (true) {
while (nextIndex < currentBufferLength - 1) {
long offset = calcElementOffset(nextIndex++);
E e = lvElement(currentBuffer, offset);
if (e != null && e != JUMP) {
return e;
}
}
long offset = calcElementOffset(currentBufferLength - 1);
Object nextArray = lvElement(currentBuffer, offset);
if (nextArray == BUFFER_CONSUMED) {
//Consumer may have passed us, just jump to the current consumer buffer
setBuffer(consumerBuffer);
} else if (nextArray != null) {
setBuffer((E[]) nextArray);
} else {
return null;
}
elements.add((E) element);
}
long offset = calcElementOffset((length - 1));
Object nextArray = lvElement(currentBuffer, offset);
if (nextArray != null) {
currentBuffer = (E[]) nextArray;
} else {
break;
}
}
return elements;
}

private void resize(long oldMask, E[] oldBuffer, long pIndex, E e)
Expand Down
58 changes: 49 additions & 9 deletions jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java
Expand Up @@ -20,6 +20,7 @@
import static org.jctools.util.UnsafeRefArrayAccess.*;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

abstract class MpscArrayQueueL1Pad<E> extends ConcurrentCircularArrayQueue<E>
Expand Down Expand Up @@ -566,16 +567,55 @@ public void fill(Supplier<E> s, WaitStrategy w, ExitCondition exit)
}
}

public List<E> unorderedSnapshot() {
int length = capacity();
List<E> elements = new ArrayList<E>();
for (int i = 0; i < length; i++) {
long offset = calcElementOffset(i);
E element = lvElement(buffer, offset);
if (element != null) {
elements.add(element);
/**
* Get an iterator for this queue. This method is thread safe.
*
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* @return The iterator.
*/
@Override
public final Iterator<E> iterator() {
final long cIndex = lvConsumerIndex();
final long pIndex = lvProducerIndex();

return new WeakIterator(cIndex, pIndex);
}

private final class WeakIterator implements Iterator<E> {

private final long pIndex;
private long nextIndex;
private E nextElement;

WeakIterator(long cIndex, long pIndex) {
this.nextIndex = cIndex;
this.pIndex = pIndex;
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private E getNext() {
while (nextIndex < pIndex) {
long offset = calcElementOffset(nextIndex++);
E e = lvElement(buffer, offset);
if (e != null) {
return e;
}
}
return null;
}
return elements;
}
}
Expand Up @@ -19,11 +19,14 @@

package org.jctools.queues;

import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -37,16 +40,40 @@ public void setUp() throws Exception {
}

@Test
public void testSnapshot() {
public void testIterator() {
queue.offer(0);
assertThat(queue.unorderedSnapshot(), contains(0));
assertThat(iteratorToList(), contains(0));
for (int i = 1; i < queue.capacity(); i++) {
queue.offer(i);
}
assertThat(queue.unorderedSnapshot(), containsInAnyOrder(0, 1, 2, 3));
assertThat(iteratorToList(), containsInAnyOrder(0, 1, 2, 3));
queue.poll();
queue.offer(4);
assertThat(queue.unorderedSnapshot(), containsInAnyOrder(1, 2, 3, 4));
queue.poll();
assertThat(iteratorToList(), containsInAnyOrder(2, 3, 4));
}

@Test
public void testIteratorHasNextConcurrentModification() {
//There may be gaps in the elements returned by the iterator,
//but hasNext needs to be reliable even if the elements are consumed between hasNext() and next().
queue.offer(0);
queue.offer(1);
Iterator<Integer> iter = queue.iterator();
assertThat(iter.hasNext(), is(true));
queue.poll();
queue.poll();
assertThat(queue.isEmpty(), is(true));
assertThat(iter.hasNext(), is(true));
assertThat(iter.next(), is(0));
assertThat(iter.hasNext(), is(false));
}

private List<Integer> iteratorToList() {
List<Integer> list = new ArrayList<>();
Iterator<Integer> iter = queue.iterator();
iter.forEachRemaining(list::add);
return list;
}

}
Expand Up @@ -22,8 +22,12 @@
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.junit.Before;
import org.junit.Test;

Expand All @@ -34,26 +38,71 @@ public class MpscUnboundedArrayQueueSnapshotTest {

@Before
public void setUp() throws Exception {
this.queue = new MpscUnboundedArrayQueue<>(CHUNK_SIZE);
this.queue = new MpscUnboundedArrayQueue<>(CHUNK_SIZE + 1); //Account for extra slot for JUMP
}

@Test
public void testSnapshot() {
public void testIterator() {
queue.offer(0);
assertThat(queue.unorderedSnapshot(), contains(0));
assertThat(iteratorToList(), contains(0));
for (int i = 1; i < CHUNK_SIZE; i++) {
queue.offer(i);
}
assertThat(queue.unorderedSnapshot(), containsInAnyOrder(0, 1, 2, 3));
assertThat(iteratorToList(), containsInAnyOrder(0, 1, 2, 3));
queue.offer(4);
queue.offer(5);
assertThat(queue.unorderedSnapshot(), containsInAnyOrder(0, 1, 2, 3, 4, 5));
assertThat(iteratorToList(), containsInAnyOrder(0, 1, 2, 3, 4, 5));
queue.poll();
assertThat(queue.unorderedSnapshot(), containsInAnyOrder(1, 2, 3, 4, 5));
assertThat(iteratorToList(), containsInAnyOrder(1, 2, 3, 4, 5));
for (int i = 1; i < CHUNK_SIZE; i++) {
queue.poll();
}
assertThat(queue.unorderedSnapshot(), containsInAnyOrder(4, 5));
assertThat(iteratorToList(), containsInAnyOrder(4, 5));
}

@Test
public void testIteratorOutpacedByConsumer() {
int slotsToForceMultipleBuffers = CHUNK_SIZE + 1;
for (int i = 0; i < slotsToForceMultipleBuffers; i++) {
queue.offer(i);
}
Iterator<Integer> iter = queue.iterator();
List<Integer> entries = new ArrayList<>();
entries.add(iter.next());
for (int i = 0; i < CHUNK_SIZE; i++) {
queue.poll();
}
//Now that the consumer has discarded the first buffer, the iterator needs to follow it to the new buffer.
iter.forEachRemaining(entries::add);
assertThat(entries, containsInAnyOrder(0, 1, 4));
}

@Test
public void testIteratorHasNextConcurrentModification() {
/*
* There may be gaps in the elements returned by the iterator, but hasNext needs to be reliable even if the elements are consumed
* between hasNext() and next(), and even if the consumer buffer changes.
*/
int slotsToForceMultipleBuffers = CHUNK_SIZE + 1;
for (int i = 0; i < slotsToForceMultipleBuffers; i++) {
queue.offer(i);
}
Iterator<Integer> iter = queue.iterator();
assertThat(iter.hasNext(), is(true));
for (int i = 0; i < slotsToForceMultipleBuffers; i++) {
queue.poll();
}
assertThat(queue.isEmpty(), is(true));
assertThat(iter.hasNext(), is(true));
assertThat(iter.next(), is(0));
assertThat(iter.hasNext(), is(false));
}

private List<Integer> iteratorToList() {
List<Integer> list = new ArrayList<>();
Iterator<Integer> iter = queue.iterator();
iter.forEachRemaining(list::add);
return list;
}

}

0 comments on commit 48adfd1

Please sign in to comment.