Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue-228: Add unorderedSnapshot method to MpscArrayQueue and BaseMps… #229

Merged
merged 3 commits into from Apr 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -26,9 +26,11 @@
import static org.jctools.queues.LinkedArrayQueueUtil.modifiedCalcElementOffset;
import static org.jctools.util.UnsafeAccess.UNSAFE;
import static org.jctools.util.UnsafeAccess.fieldOffset;
import static org.jctools.util.UnsafeRefArrayAccess.calcElementOffset;
import static org.jctools.util.UnsafeRefArrayAccess.lvElement;
import static org.jctools.util.UnsafeRefArrayAccess.soElement;


abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue
{
long p01, p02, p03, p04, p05, p06, p07;
Expand Down Expand Up @@ -135,6 +137,7 @@ public abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQue
{
// No post padding here, subclasses must add
private static final Object JUMP = new Object();
private static final Object BUFFER_CONSUMED = new Object();
private static final int CONTINUE_TO_P_INDEX_CAS = 0;
private static final int RETRY = 1;
private static final int QUEUE_FULL = 2;
Expand All @@ -161,12 +164,6 @@ public BaseMpscLinkedArrayQueue(final int initialCapacity)
soProducerLimit(mask); // we know it's all empty to start with
}

@Override
public final Iterator<E> iterator()
{
throw new UnsupportedOperationException();
}

@Override
public final int size()
{
Expand Down Expand Up @@ -312,7 +309,7 @@ public E poll()

if (e == JUMP)
{
final E[] nextBuffer = getNextBuffer(buffer, mask);
final E[] nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}

Expand Down Expand Up @@ -348,7 +345,7 @@ public E peek()
}
if (e == JUMP)
{
return newBufferPeek(getNextBuffer(buffer, mask), index);
return newBufferPeek(nextBuffer(buffer, mask), index);
}
return (E) e;
}
Expand Down Expand Up @@ -399,11 +396,13 @@ else if (casProducerIndex(pIndex, pIndex + 1))
protected abstract long availableInQueue(long pIndex, long cIndex);

@SuppressWarnings("unchecked")
private E[] getNextBuffer(final E[] buffer, final long mask)
private E[] nextBuffer(final E[] buffer, final long mask)
{
final long offset = nextArrayOffset(mask);
final E[] nextBuffer = (E[]) lvElement(buffer, offset);
soElement(buffer, offset, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause GC nepotism

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, wasn't aware of this issue. Will try to come up with a solution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced this with a marker object that will cause the iterator to jump to the current consumerBuffer.

consumerBuffer = nextBuffer;
consumerMask = (length(nextBuffer) - 2) << 1;
soElement(buffer, offset, BUFFER_CONSUMED);
return nextBuffer;
}

Expand All @@ -414,7 +413,7 @@ private long nextArrayOffset(long mask)

private E newBufferPoll(E[] nextBuffer, long index)
{
final long offset = newBufferAndOffset(nextBuffer, index);
final long offset = modifiedCalcElementOffset(index, consumerMask);
final E n = lvElement(nextBuffer, offset);// LoadLoad
if (n == null)
{
Expand All @@ -427,7 +426,7 @@ private E newBufferPoll(E[] nextBuffer, long index)

private E newBufferPeek(E[] nextBuffer, long index)
{
final long offset = newBufferAndOffset(nextBuffer, index);
final long offset = modifiedCalcElementOffset(index, consumerMask);
final E n = lvElement(nextBuffer, offset);// LoadLoad
if (null == n)
{
Expand All @@ -436,13 +435,6 @@ private E newBufferPeek(E[] nextBuffer, long index)
return n;
}

private long newBufferAndOffset(E[] nextBuffer, long index)
{
consumerBuffer = nextBuffer;
consumerMask = (length(nextBuffer) - 2) << 1;
return modifiedCalcElementOffset(index, consumerMask);
}

@Override
public long currentProducerIndex()
{
Expand Down Expand Up @@ -480,7 +472,7 @@ public E relaxedPoll()
}
if (e == JUMP)
{
final E[] nextBuffer = getNextBuffer(buffer, mask);
final E[] nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}
soElement(buffer, offset, null);
Expand All @@ -500,7 +492,7 @@ public E relaxedPeek()
Object e = lvElement(buffer, offset);// LoadLoad
if (e == JUMP)
{
return newBufferPeek(getNextBuffer(buffer, mask), index);
return newBufferPeek(nextBuffer(buffer, mask), index);
}
return (E) e;
}
Expand Down Expand Up @@ -640,6 +632,74 @@ public void drain(Consumer<E> c, WaitStrategy w, ExitCondition exit)
c.accept(e);
}
}

/**
* Get an iterator for this queue. This method is thread safe.
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
public Iterator<E> iterator() {
return new WeakIterator();
}

private final class WeakIterator implements Iterator<E> {

private long nextIndex;
private E nextElement;
private E[] currentBuffer;
private int currentBufferLength;

WeakIterator() {
setBuffer(consumerBuffer);
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private void setBuffer(E[] buffer) {
this.currentBuffer = buffer;
this.currentBufferLength = length(buffer);
this.nextIndex = 0;
}

private E getNext() {
while (true) {
while (nextIndex < currentBufferLength - 1) {
long offset = calcElementOffset(nextIndex++);
E e = lvElement(currentBuffer, offset);
if (e != null && e != JUMP) {
return e;
}
}
long offset = calcElementOffset(currentBufferLength - 1);
Object nextArray = lvElement(currentBuffer, offset);
if (nextArray == BUFFER_CONSUMED) {
//Consumer may have passed us, just jump to the current consumer buffer
setBuffer(consumerBuffer);
} else if (nextArray != null) {
setBuffer((E[]) nextArray);
} else {
return null;
}
}
}
}

private void resize(long oldMask, E[] oldBuffer, long pIndex, E e)
{
Expand Down
56 changes: 56 additions & 0 deletions jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java
Expand Up @@ -19,6 +19,8 @@
import static org.jctools.util.UnsafeAccess.fieldOffset;
import static org.jctools.util.UnsafeRefArrayAccess.*;

import java.util.Iterator;

abstract class MpscArrayQueueL1Pad<E> extends ConcurrentCircularArrayQueue<E>
{
long p00, p01, p02, p03, p04, p05, p06, p07;
Expand Down Expand Up @@ -562,4 +564,58 @@ public void fill(Supplier<E> s, WaitStrategy w, ExitCondition exit)
idleCounter = 0;
}
}

/**
* Get an iterator for this queue. This method is thread safe.
* <p>
* The iterator provides a best-effort snapshot of the elements in the queue.
* The returned iterator is not guaranteed to return elements in queue order,
* and races with the consumer thread may cause gaps in the sequence of returned elements.
* Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
*
* @return The iterator.
*/
@Override
public final Iterator<E> iterator() {
final long cIndex = lvConsumerIndex();
final long pIndex = lvProducerIndex();

return new WeakIterator(cIndex, pIndex);
}

private final class WeakIterator implements Iterator<E> {

private final long pIndex;
private long nextIndex;
private E nextElement;

WeakIterator(long cIndex, long pIndex) {
this.nextIndex = cIndex;
this.pIndex = pIndex;
nextElement = getNext();
}

@Override
public boolean hasNext() {
return nextElement != null;
}

@Override
public E next() {
E e = nextElement;
nextElement = getNext();
return e;
}

private E getNext() {
while (nextIndex < pIndex) {
long offset = calcElementOffset(nextIndex++);
E e = lvElement(buffer, offset);
if (e != null) {
return e;
}
}
return null;
}
}
}