Skip to content

Commit

Permalink
Generalize iterator support to all array backed impls
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsanw committed May 26, 2019
1 parent 9250fe3 commit 214133a
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import java.util.AbstractQueue;
import java.util.Iterator;

import static org.jctools.util.UnsafeRefArrayAccess.lvElement;

abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E>
implements MessagePassingQueue<E>, IndexedQueue, QueueProgressIndicators
{
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;
Expand All @@ -34,6 +35,7 @@ abstract class ConcurrentCircularArrayQueueL0Pad<E> extends AbstractQueue<E>
* @author nitsanw
*/
abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircularArrayQueueL0Pad<E>
implements MessagePassingQueue<E>, IndexedQueue, QueueProgressIndicators, SupportsIterator
{
protected final long mask;
protected final E[] buffer;
Expand Down Expand Up @@ -64,12 +66,6 @@ protected final long calcElementOffset(long index)
return calcElementOffset(index, mask);
}

@Override
public Iterator<E> iterator()
{
throw new UnsupportedOperationException();
}

@Override
public final int size()
{
Expand Down Expand Up @@ -115,4 +111,57 @@ public final long currentConsumerIndex()
return lvConsumerIndex();
}

/**
* Get an iterator for this queue. This method is thread safe.
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
public final Iterator<E> iterator() {
final long cIndex = lvConsumerIndex();
final long pIndex = lvProducerIndex();

return new WeakIterator(cIndex, pIndex);
}

private final class WeakIterator implements Iterator<E> {

private final long pIndex;
private long nextIndex;
private E nextElement;

WeakIterator(long cIndex, long pIndex) {
this.nextIndex = cIndex;
this.pIndex = pIndex;
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private E getNext() {
while (nextIndex < pIndex) {
long offset = calcElementOffset(nextIndex++);
E e = lvElement(buffer, offset);
if (e != null) {
return e;
}
}
return null;
}
}
}
54 changes: 0 additions & 54 deletions jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -564,58 +564,4 @@ public void fill(Supplier<E> s, WaitStrategy w, ExitCondition exit)
idleCounter = 0;
}
}

/**
* Get an iterator for this queue. This method is thread safe.
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
public final Iterator<E> iterator() {
final long cIndex = lvConsumerIndex();
final long pIndex = lvProducerIndex();

return new WeakIterator(cIndex, pIndex);
}

private final class WeakIterator implements Iterator<E> {

private final long pIndex;
private long nextIndex;
private E nextElement;

WeakIterator(long cIndex, long pIndex) {
this.nextIndex = cIndex;
this.pIndex = pIndex;
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private E getNext() {
while (nextIndex < pIndex) {
long offset = calcElementOffset(nextIndex++);
E e = lvElement(buffer, offset);
if (e != null) {
return e;
}
}
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.jctools.queues;

import org.jctools.util.InternalAPI;

/**
* Tagging interface to help testing
*/
@InternalAPI
public interface SupportsIterator
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.QueueProgressIndicators;
import org.jctools.queues.SupportsIterator;
import org.jctools.util.Pow2;

import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReferenceArray;


abstract class AtomicReferenceArrayQueue<E> extends AbstractQueue<E> implements IndexedQueue, QueueProgressIndicators, MessagePassingQueue<E>
abstract class AtomicReferenceArrayQueue<E> extends AbstractQueue<E> implements IndexedQueue, QueueProgressIndicators, MessagePassingQueue<E>, SupportsIterator
{
protected final AtomicReferenceArray<E> buffer;
protected final int mask;
Expand All @@ -35,13 +36,7 @@ public AtomicReferenceArrayQueue(int capacity)
this.mask = actualCapacity - 1;
this.buffer = new AtomicReferenceArray<E>(actualCapacity);
}

@Override
public Iterator<E> iterator()
{
throw new UnsupportedOperationException();
}


@Override
public String toString()
{
Expand Down Expand Up @@ -145,4 +140,58 @@ public final long currentConsumerIndex()
{
return lvConsumerIndex();
}

/**
* Get an iterator for this queue. This method is thread safe.
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
public final Iterator<E> iterator() {
final long cIndex = lvConsumerIndex();
final long pIndex = lvProducerIndex();

return new WeakIterator(cIndex, pIndex);
}

private final class WeakIterator implements Iterator<E> {

private final long pIndex;
private long nextIndex;
private E nextElement;

WeakIterator(long cIndex, long pIndex) {
this.nextIndex = cIndex;
this.pIndex = pIndex;
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private E getNext() {
while (nextIndex < pIndex) {
int offset = calcElementOffset(nextIndex++);
E e = lvElement(buffer, offset);
if (e != null) {
return e;
}
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -534,65 +534,6 @@ public void fill(Supplier<E> s, WaitStrategy w, ExitCondition exit) {
}
}

/**
* Get an iterator for this queue. This method is thread safe.
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
public final Iterator<E> iterator() {
final long cIndex = lvConsumerIndex();
final long pIndex = lvProducerIndex();
return new WeakIterator(cIndex, pIndex);
}

/**
* NOTE: This class was automatically generated by org.jctools.queues.atomic.JavaParsingAtomicArrayQueueGenerator
* which can found in the jctools-build module. The original source file is MpscArrayQueue.java.
*/
private final class WeakIterator implements Iterator<E> {

private final long pIndex;

private long nextIndex;

private E nextElement;

WeakIterator(long cIndex, long pIndex) {
this.nextIndex = cIndex;
this.pIndex = pIndex;
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private E getNext() {
while (nextIndex < pIndex) {
int offset = calcElementOffset(nextIndex++);
E e = lvElement(buffer, offset);
if (e != null) {
return e;
}
}
return null;
}
}

/**
* @deprecated This was renamed to failFastOffer please migrate
*/
Expand Down
57 changes: 57 additions & 0 deletions jctools-core/src/test/java/org/jctools/queues/QueueSanityTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.jctools.queues;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -13,6 +16,7 @@
import org.jctools.util.Pow2;

import static org.hamcrest.Matchers.*;
import static org.jctools.queues.MessagePassingQueue.UNBOUNDED_CAPACITY;
import static org.jctools.queues.matchers.Matchers.emptyAndZeroSize;
import static org.junit.Assert.*;
import static org.junit.Assume.assumeThat;
Expand Down Expand Up @@ -402,5 +406,58 @@ static final class Val
{
public int value;
}

@Test
public void testIterator() {
assumeThat(queue, instanceOf(SupportsIterator.class));

int capacity = ((MessagePassingQueue)queue).capacity();
int insertLimit = (capacity == UNBOUNDED_CAPACITY) ? 128 : capacity;

for (int i = 0; i < insertLimit; i++) {
queue.offer(i);
}

Iterator<Integer> iterator = queue.iterator();
for (int i = 0; i < insertLimit; i++) {
assertEquals(Integer.valueOf(i), iterator.next());
}
assertTrue((capacity == UNBOUNDED_CAPACITY) || !iterator.hasNext());

queue.poll(); // drop 0
queue.offer(insertLimit); // add capacity
iterator = queue.iterator();
for (int i = 1; i <= insertLimit; i++) {
assertEquals(Integer.valueOf(i), iterator.next());
}
assertTrue((capacity == UNBOUNDED_CAPACITY) || !iterator.hasNext());
}

@Test
public void testIteratorHasNextConcurrentModification() {
assumeThat(queue, instanceOf(SupportsIterator.class));
int capacity = ((MessagePassingQueue)queue).capacity();
if (capacity != UNBOUNDED_CAPACITY)
assumeThat(capacity, greaterThanOrEqualTo(2));
//There may be gaps in the elements returned by the iterator,
//but hasNext needs to be reliable even if the elements are consumed between hasNext() and next().
queue.offer(0);
queue.offer(1);
Iterator<Integer> iter = queue.iterator();
assertThat(iter.hasNext(), is(true));
queue.poll();
queue.poll();
assertThat(queue.isEmpty(), is(true));
assertThat(iter.hasNext(), is(true));
assertThat(iter.next(), is(0));
assertThat(iter.hasNext(), is(false));
}

private List<Integer> iteratorToList() {
List<Integer> list = new ArrayList<>();
Iterator<Integer> iter = queue.iterator();
iter.forEachRemaining(list::add);
return list;
}

}

0 comments on commit 214133a

Please sign in to comment.