Skip to content

Commit

Permalink
Fix up visibility and fix #209
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsanw committed May 21, 2018
1 parent 84dbbba commit 72c55e5
Show file tree
Hide file tree
Showing 21 changed files with 212 additions and 155 deletions.
29 changes: 13 additions & 16 deletions jctools-core/src/main/java/org/jctools/queues/BaseLinkedQueue.java
Expand Up @@ -13,9 +13,6 @@
*/
package org.jctools.queues;

import org.jctools.util.UnsafeAccess;

import java.lang.reflect.Field;
import java.util.AbstractQueue;
import java.util.Iterator;

Expand All @@ -31,28 +28,28 @@ abstract class BaseLinkedQueuePad0<E> extends AbstractQueue<E> implements Messag
// $gen:ordered-fields
abstract class BaseLinkedQueueProducerNodeRef<E> extends BaseLinkedQueuePad0<E>
{
protected final static long P_NODE_OFFSET = fieldOffset(BaseLinkedQueueProducerNodeRef.class, "producerNode");
final static long P_NODE_OFFSET = fieldOffset(BaseLinkedQueueProducerNodeRef.class, "producerNode");

protected LinkedQueueNode<E> producerNode;
private LinkedQueueNode<E> producerNode;

protected final void spProducerNode(LinkedQueueNode<E> newValue)
final void spProducerNode(LinkedQueueNode<E> newValue)
{
producerNode = newValue;
}

@SuppressWarnings("unchecked")
protected final LinkedQueueNode<E> lvProducerNode()
final LinkedQueueNode<E> lvProducerNode()
{
return (LinkedQueueNode<E>) UNSAFE.getObjectVolatile(this, P_NODE_OFFSET);
}

@SuppressWarnings("unchecked")
protected final boolean casProducerNode(LinkedQueueNode<E> expect, LinkedQueueNode<E> newValue)
final boolean casProducerNode(LinkedQueueNode<E> expect, LinkedQueueNode<E> newValue)
{
return UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, expect, newValue);
}

protected final LinkedQueueNode<E> lpProducerNode()
final LinkedQueueNode<E> lpProducerNode()
{
return producerNode;
}
Expand All @@ -67,22 +64,22 @@ abstract class BaseLinkedQueuePad1<E> extends BaseLinkedQueueProducerNodeRef<E>
//$gen:ordered-fields
abstract class BaseLinkedQueueConsumerNodeRef<E> extends BaseLinkedQueuePad1<E>
{
protected final static long C_NODE_OFFSET = fieldOffset(BaseLinkedQueueConsumerNodeRef.class,"consumerNode");
private final static long C_NODE_OFFSET = fieldOffset(BaseLinkedQueueConsumerNodeRef.class,"consumerNode");

protected LinkedQueueNode<E> consumerNode;
private LinkedQueueNode<E> consumerNode;

protected final void spConsumerNode(LinkedQueueNode<E> newValue)
final void spConsumerNode(LinkedQueueNode<E> newValue)
{
consumerNode = newValue;
}

@SuppressWarnings("unchecked")
protected final LinkedQueueNode<E> lvConsumerNode()
final LinkedQueueNode<E> lvConsumerNode()
{
return (LinkedQueueNode<E>) UNSAFE.getObjectVolatile(this, C_NODE_OFFSET);
}

protected final LinkedQueueNode<E> lpConsumerNode()
final LinkedQueueNode<E> lpConsumerNode()
{
return consumerNode;
}
Expand Down Expand Up @@ -238,7 +235,7 @@ public int drain(Consumer<E> c)
@Override
public int drain(Consumer<E> c, int limit)
{
LinkedQueueNode<E> chaserNode = this.consumerNode;
LinkedQueueNode<E> chaserNode = this.lpConsumerNode();
for (int i = 0; i < limit; i++)
{
final LinkedQueueNode<E> nextNode = chaserNode.lvNext();
Expand All @@ -258,7 +255,7 @@ public int drain(Consumer<E> c, int limit)
@Override
public void drain(Consumer<E> c, WaitStrategy wait, ExitCondition exit)
{
LinkedQueueNode<E> chaserNode = this.consumerNode;
LinkedQueueNode<E> chaserNode = this.lpConsumerNode();
int idleCounter = 0;
while (exit.keepRunning())
{
Expand Down
Expand Up @@ -18,7 +18,6 @@
import org.jctools.util.Pow2;
import org.jctools.util.RangeUtil;

import java.lang.reflect.Field;
import java.util.AbstractQueue;
import java.util.Iterator;

Expand All @@ -41,14 +40,14 @@ abstract class BaseMpscLinkedArrayQueueProducerFields<E> extends BaseMpscLinkedA
{
private final static long P_INDEX_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueProducerFields.class, "producerIndex");

protected long producerIndex;
private volatile long producerIndex;

@Override
public final long lvProducerIndex()
{
return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET);
return producerIndex;
}

final void soProducerIndex(long newValue)
{
UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue);
Expand All @@ -71,14 +70,19 @@ abstract class BaseMpscLinkedArrayQueueConsumerFields<E> extends BaseMpscLinkedA
{
private final static long C_INDEX_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueConsumerFields.class,"consumerIndex");

private volatile long consumerIndex;
protected long consumerMask;
protected E[] consumerBuffer;
protected long consumerIndex;

@Override
public final long lvConsumerIndex()
{
return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET);
return consumerIndex;
}

final long lpConsumerIndex()
{
return UNSAFE.getLong(this, C_INDEX_OFFSET);
}

final void soConsumerIndex(long newValue)
Expand Down Expand Up @@ -282,7 +286,7 @@ public boolean offer(final E e)
public E poll()
{
final E[] buffer = consumerBuffer;
final long index = consumerIndex;
final long index = lpConsumerIndex();
final long mask = consumerMask;

final long offset = modifiedCalcElementOffset(index, mask);
Expand Down Expand Up @@ -327,7 +331,7 @@ public E poll()
public E peek()
{
final E[] buffer = consumerBuffer;
final long index = consumerIndex;
final long index = lpConsumerIndex();
final long mask = consumerMask;

final long offset = modifiedCalcElementOffset(index, mask);
Expand Down Expand Up @@ -465,7 +469,7 @@ public boolean relaxedOffer(E e)
public E relaxedPoll()
{
final E[] buffer = consumerBuffer;
final long index = consumerIndex;
final long index = lpConsumerIndex();
final long mask = consumerMask;

final long offset = modifiedCalcElementOffset(index, mask);
Expand All @@ -489,7 +493,7 @@ public E relaxedPoll()
public E relaxedPeek()
{
final E[] buffer = consumerBuffer;
final long index = consumerIndex;
final long index = lpConsumerIndex();
final long mask = consumerMask;

final long offset = modifiedCalcElementOffset(index, mask);
Expand Down
Expand Up @@ -16,7 +16,6 @@
import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue;
import org.jctools.util.PortableJvmInfo;

import java.lang.reflect.Field;
import java.util.AbstractQueue;
import java.util.Iterator;

Expand Down Expand Up @@ -46,18 +45,24 @@ abstract class BaseSpscLinkedArrayQueueConsumerField<E> extends BaseSpscLinkedAr
{
private final static long C_INDEX_OFFSET = fieldOffset(BaseSpscLinkedArrayQueueConsumerField.class, "consumerIndex");

protected long consumerIndex;
private volatile long consumerIndex;

final void soConsumerIndex(long newValue)
@Override
public final long lvConsumerIndex()
{
UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
return consumerIndex;
}

final long lpConsumerIndex()
{
return UNSAFE.getLong(this, C_INDEX_OFFSET);
}

@Override
public final long lvConsumerIndex()
final void soConsumerIndex(long newValue)
{
return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET);
UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
}

}

abstract class BaseSpscLinkedArrayQueueL2Pad<E> extends BaseSpscLinkedArrayQueueConsumerField<E>
Expand All @@ -71,18 +76,24 @@ abstract class BaseSpscLinkedArrayQueueProducerFields<E> extends BaseSpscLinkedA
{
private final static long P_INDEX_OFFSET = fieldOffset(BaseSpscLinkedArrayQueueProducerFields.class,"producerIndex");

protected long producerIndex;
private volatile long producerIndex;

@Override
public final long lvProducerIndex()
{
return producerIndex;
}

final void soProducerIndex(long newValue)
{
UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue);
}

@Override
public final long lvProducerIndex()
final long lpProducerIndex()
{
return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET);
return UNSAFE.getLong(this, P_INDEX_OFFSET);
}

}

abstract class BaseSpscLinkedArrayQueueProducerColdFields<E> extends BaseSpscLinkedArrayQueueProducerFields<E>
Expand Down Expand Up @@ -206,7 +217,7 @@ public int fill(Supplier<E> s, int limit)
{
// local load of field to avoid repeated loads after volatile reads
final E[] buffer = producerBuffer;
final long index = producerIndex;
final long index = lpProducerIndex();
final long mask = producerMask;
final long offset = calcElementOffset(index, mask);
// expected hot path
Expand Down Expand Up @@ -264,7 +275,7 @@ public boolean offer(final E e)
}
// local load of field to avoid repeated loads after volatile reads
final E[] buffer = producerBuffer;
final long index = producerIndex;
final long index = lpProducerIndex();
final long mask = producerMask;
final long offset = calcElementOffset(index, mask);
// expected hot path
Expand Down Expand Up @@ -295,7 +306,7 @@ public E poll()
{
// local load of field to avoid repeated loads after volatile reads
final E[] buffer = consumerBuffer;
final long index = consumerIndex;
final long index = lpConsumerIndex();
final long mask = consumerMask;
final long offset = calcElementOffset(index, mask);
final Object e = lvElement(buffer, offset);// LoadLoad
Expand Down Expand Up @@ -324,7 +335,7 @@ else if (isNextBuffer)
public E peek()
{
final E[] buffer = consumerBuffer;
final long index = consumerIndex;
final long index = lpConsumerIndex();
final long mask = consumerMask;
final long offset = calcElementOffset(index, mask);
final Object e = lvElement(buffer, offset);// LoadLoad
Expand Down
Expand Up @@ -3,6 +3,9 @@
import static org.jctools.util.UnsafeRefArrayAccess.REF_ARRAY_BASE;
import static org.jctools.util.UnsafeRefArrayAccess.REF_ELEMENT_SHIFT;

/**
* This is used for method substitution in the LinkedArray classes code generation.
*/
final class LinkedArrayQueueUtil
{
private LinkedArrayQueueUtil()
Expand Down
Expand Up @@ -50,7 +50,7 @@ public final long lvProducerIndex()
return producerIndex;
}

protected final boolean casProducerIndex(long expect, long newValue)
final boolean casProducerIndex(long expect, long newValue)
{
return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
Expand Down Expand Up @@ -85,7 +85,7 @@ public final long lvConsumerIndex()
return consumerIndex;
}

protected final boolean casConsumerIndex(long expect, long newValue)
final boolean casConsumerIndex(long expect, long newValue)
{
return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);
}
Expand Down
Expand Up @@ -48,7 +48,7 @@ public final long lvProducerIndex()
return producerIndex;
}

protected final boolean casProducerIndex(long expect, long newValue)
final boolean casProducerIndex(long expect, long newValue)
{
return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
Expand Down Expand Up @@ -79,12 +79,12 @@ abstract class MpscArrayQueueProducerLimitField<E> extends MpscArrayQueueMidPad<
this.producerLimit = capacity;
}

protected final long lvProducerLimit()
final long lvProducerLimit()
{
return producerLimit;
}

protected final void soProducerLimit(long newValue)
final void soProducerLimit(long newValue)
{
UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, newValue);
}
Expand All @@ -106,25 +106,25 @@ abstract class MpscArrayQueueConsumerIndexField<E> extends MpscArrayQueueL2Pad<E
{
private final static long C_INDEX_OFFSET = fieldOffset(MpscArrayQueueConsumerIndexField.class, "consumerIndex");

protected long consumerIndex;
private volatile long consumerIndex;

MpscArrayQueueConsumerIndexField(int capacity)
{
super(capacity);
}

protected final long lpConsumerIndex()
@Override
public final long lvConsumerIndex()
{
return consumerIndex;
}

@Override
public final long lvConsumerIndex()
final long lpConsumerIndex()
{
return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET);
return UNSAFE.getLong(this, C_INDEX_OFFSET);
}

protected void soConsumerIndex(long newValue)
final void soConsumerIndex(long newValue)
{
UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
}
Expand Down
Expand Up @@ -28,7 +28,7 @@ protected final LinkedQueueNode<E> xchgProducerNode(LinkedQueueNode<E> newVal)
Object oldVal;
do
{
oldVal = producerNode;
oldVal = lvProducerNode();
}
while (!UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, oldVal, newVal));
return (LinkedQueueNode<E>) oldVal;
Expand Down

0 comments on commit 72c55e5

Please sign in to comment.