From 72c55e5bee5ed05e47c5f997f8d4bc9d736659d1 Mon Sep 17 00:00:00 2001 From: nitsanw Date: Mon, 21 May 2018 13:14:27 +0200 Subject: [PATCH] Fix up visibility and fix #209 --- .../org/jctools/queues/BaseLinkedQueue.java | 29 ++++++------- .../queues/BaseMpscLinkedArrayQueue.java | 24 ++++++----- .../queues/BaseSpscLinkedArrayQueue.java | 41 +++++++++++------- .../jctools/queues/LinkedArrayQueueUtil.java | 3 ++ .../org/jctools/queues/MpmcArrayQueue.java | 4 +- .../org/jctools/queues/MpscArrayQueue.java | 18 ++++---- .../org/jctools/queues/MpscLinkedQueue7.java | 2 +- .../org/jctools/queues/SpmcArrayQueue.java | 32 ++++++++------ .../org/jctools/queues/SpscArrayQueue.java | 43 ++++++++++++------- .../org/jctools/queues/SpscLinkedQueue.java | 4 +- .../queues/atomic/BaseLinkedAtomicQueue.java | 23 +++++----- .../BaseMpscLinkedAtomicArrayQueue.java | 19 ++++---- .../BaseSpscLinkedAtomicArrayQueue.java | 33 ++++++++------ .../queues/atomic/MpmcAtomicArrayQueue.java | 4 +- .../queues/atomic/MpscAtomicArrayQueue.java | 16 +++---- .../queues/atomic/MpscLinkedAtomicQueue.java | 2 +- .../queues/atomic/SpmcAtomicArrayQueue.java | 26 ++++++----- .../queues/atomic/SpscAtomicArrayQueue.java | 32 ++++++++------ .../queues/atomic/SpscLinkedAtomicQueue.java | 4 +- .../jctools/queues/SpscArrayQueueTest.java | 4 +- .../atomic/SpscAtomicArrayQueueTest.java | 4 +- 21 files changed, 212 insertions(+), 155 deletions(-) diff --git a/jctools-core/src/main/java/org/jctools/queues/BaseLinkedQueue.java b/jctools-core/src/main/java/org/jctools/queues/BaseLinkedQueue.java index 95fd93b4..0d86648c 100644 --- a/jctools-core/src/main/java/org/jctools/queues/BaseLinkedQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/BaseLinkedQueue.java @@ -13,9 +13,6 @@ */ package org.jctools.queues; -import org.jctools.util.UnsafeAccess; - -import java.lang.reflect.Field; import java.util.AbstractQueue; import java.util.Iterator; @@ -31,28 +28,28 @@ abstract class BaseLinkedQueuePad0 extends AbstractQueue implements Messag // $gen:ordered-fields abstract class BaseLinkedQueueProducerNodeRef extends BaseLinkedQueuePad0 { - protected final static long P_NODE_OFFSET = fieldOffset(BaseLinkedQueueProducerNodeRef.class, "producerNode"); + final static long P_NODE_OFFSET = fieldOffset(BaseLinkedQueueProducerNodeRef.class, "producerNode"); - protected LinkedQueueNode producerNode; + private LinkedQueueNode producerNode; - protected final void spProducerNode(LinkedQueueNode newValue) + final void spProducerNode(LinkedQueueNode newValue) { producerNode = newValue; } @SuppressWarnings("unchecked") - protected final LinkedQueueNode lvProducerNode() + final LinkedQueueNode lvProducerNode() { return (LinkedQueueNode) UNSAFE.getObjectVolatile(this, P_NODE_OFFSET); } @SuppressWarnings("unchecked") - protected final boolean casProducerNode(LinkedQueueNode expect, LinkedQueueNode newValue) + final boolean casProducerNode(LinkedQueueNode expect, LinkedQueueNode newValue) { return UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, expect, newValue); } - protected final LinkedQueueNode lpProducerNode() + final LinkedQueueNode lpProducerNode() { return producerNode; } @@ -67,22 +64,22 @@ abstract class BaseLinkedQueuePad1 extends BaseLinkedQueueProducerNodeRef //$gen:ordered-fields abstract class BaseLinkedQueueConsumerNodeRef extends BaseLinkedQueuePad1 { - protected final static long C_NODE_OFFSET = fieldOffset(BaseLinkedQueueConsumerNodeRef.class,"consumerNode"); + private final static long C_NODE_OFFSET = fieldOffset(BaseLinkedQueueConsumerNodeRef.class,"consumerNode"); - protected LinkedQueueNode consumerNode; + private LinkedQueueNode consumerNode; - protected final void spConsumerNode(LinkedQueueNode newValue) + final void spConsumerNode(LinkedQueueNode newValue) { consumerNode = newValue; } @SuppressWarnings("unchecked") - protected final LinkedQueueNode lvConsumerNode() + final LinkedQueueNode lvConsumerNode() { return (LinkedQueueNode) UNSAFE.getObjectVolatile(this, C_NODE_OFFSET); } - protected final LinkedQueueNode lpConsumerNode() + final LinkedQueueNode lpConsumerNode() { return consumerNode; } @@ -238,7 +235,7 @@ public int drain(Consumer c) @Override public int drain(Consumer c, int limit) { - LinkedQueueNode chaserNode = this.consumerNode; + LinkedQueueNode chaserNode = this.lpConsumerNode(); for (int i = 0; i < limit; i++) { final LinkedQueueNode nextNode = chaserNode.lvNext(); @@ -258,7 +255,7 @@ public int drain(Consumer c, int limit) @Override public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) { - LinkedQueueNode chaserNode = this.consumerNode; + LinkedQueueNode chaserNode = this.lpConsumerNode(); int idleCounter = 0; while (exit.keepRunning()) { diff --git a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java index d45acf67..962d61f0 100644 --- a/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/BaseMpscLinkedArrayQueue.java @@ -18,7 +18,6 @@ import org.jctools.util.Pow2; import org.jctools.util.RangeUtil; -import java.lang.reflect.Field; import java.util.AbstractQueue; import java.util.Iterator; @@ -41,14 +40,14 @@ abstract class BaseMpscLinkedArrayQueueProducerFields extends BaseMpscLinkedA { private final static long P_INDEX_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueProducerFields.class, "producerIndex"); - protected long producerIndex; + private volatile long producerIndex; @Override public final long lvProducerIndex() { - return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); + return producerIndex; } - + final void soProducerIndex(long newValue) { UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue); @@ -71,14 +70,19 @@ abstract class BaseMpscLinkedArrayQueueConsumerFields extends BaseMpscLinkedA { private final static long C_INDEX_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueConsumerFields.class,"consumerIndex"); + private volatile long consumerIndex; protected long consumerMask; protected E[] consumerBuffer; - protected long consumerIndex; @Override public final long lvConsumerIndex() { - return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); + return consumerIndex; + } + + final long lpConsumerIndex() + { + return UNSAFE.getLong(this, C_INDEX_OFFSET); } final void soConsumerIndex(long newValue) @@ -282,7 +286,7 @@ public boolean offer(final E e) public E poll() { final E[] buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final long offset = modifiedCalcElementOffset(index, mask); @@ -327,7 +331,7 @@ public E poll() public E peek() { final E[] buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final long offset = modifiedCalcElementOffset(index, mask); @@ -465,7 +469,7 @@ public boolean relaxedOffer(E e) public E relaxedPoll() { final E[] buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final long offset = modifiedCalcElementOffset(index, mask); @@ -489,7 +493,7 @@ public E relaxedPoll() public E relaxedPeek() { final E[] buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final long offset = modifiedCalcElementOffset(index, mask); diff --git a/jctools-core/src/main/java/org/jctools/queues/BaseSpscLinkedArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/BaseSpscLinkedArrayQueue.java index 01d514b3..73338eba 100644 --- a/jctools-core/src/main/java/org/jctools/queues/BaseSpscLinkedArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/BaseSpscLinkedArrayQueue.java @@ -16,7 +16,6 @@ import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue; import org.jctools.util.PortableJvmInfo; -import java.lang.reflect.Field; import java.util.AbstractQueue; import java.util.Iterator; @@ -46,18 +45,24 @@ abstract class BaseSpscLinkedArrayQueueConsumerField extends BaseSpscLinkedAr { private final static long C_INDEX_OFFSET = fieldOffset(BaseSpscLinkedArrayQueueConsumerField.class, "consumerIndex"); - protected long consumerIndex; + private volatile long consumerIndex; - final void soConsumerIndex(long newValue) + @Override + public final long lvConsumerIndex() { - UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue); + return consumerIndex; + } + + final long lpConsumerIndex() + { + return UNSAFE.getLong(this, C_INDEX_OFFSET); } - @Override - public final long lvConsumerIndex() + final void soConsumerIndex(long newValue) { - return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); + UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue); } + } abstract class BaseSpscLinkedArrayQueueL2Pad extends BaseSpscLinkedArrayQueueConsumerField @@ -71,18 +76,24 @@ abstract class BaseSpscLinkedArrayQueueProducerFields extends BaseSpscLinkedA { private final static long P_INDEX_OFFSET = fieldOffset(BaseSpscLinkedArrayQueueProducerFields.class,"producerIndex"); - protected long producerIndex; + private volatile long producerIndex; + + @Override + public final long lvProducerIndex() + { + return producerIndex; + } final void soProducerIndex(long newValue) { UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue); } - @Override - public final long lvProducerIndex() + final long lpProducerIndex() { - return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); + return UNSAFE.getLong(this, P_INDEX_OFFSET); } + } abstract class BaseSpscLinkedArrayQueueProducerColdFields extends BaseSpscLinkedArrayQueueProducerFields @@ -206,7 +217,7 @@ public int fill(Supplier s, int limit) { // local load of field to avoid repeated loads after volatile reads final E[] buffer = producerBuffer; - final long index = producerIndex; + final long index = lpProducerIndex(); final long mask = producerMask; final long offset = calcElementOffset(index, mask); // expected hot path @@ -264,7 +275,7 @@ public boolean offer(final E e) } // local load of field to avoid repeated loads after volatile reads final E[] buffer = producerBuffer; - final long index = producerIndex; + final long index = lpProducerIndex(); final long mask = producerMask; final long offset = calcElementOffset(index, mask); // expected hot path @@ -295,7 +306,7 @@ public E poll() { // local load of field to avoid repeated loads after volatile reads final E[] buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final long offset = calcElementOffset(index, mask); final Object e = lvElement(buffer, offset);// LoadLoad @@ -324,7 +335,7 @@ else if (isNextBuffer) public E peek() { final E[] buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final long offset = calcElementOffset(index, mask); final Object e = lvElement(buffer, offset);// LoadLoad diff --git a/jctools-core/src/main/java/org/jctools/queues/LinkedArrayQueueUtil.java b/jctools-core/src/main/java/org/jctools/queues/LinkedArrayQueueUtil.java index 490a758a..876dc0f4 100644 --- a/jctools-core/src/main/java/org/jctools/queues/LinkedArrayQueueUtil.java +++ b/jctools-core/src/main/java/org/jctools/queues/LinkedArrayQueueUtil.java @@ -3,6 +3,9 @@ import static org.jctools.util.UnsafeRefArrayAccess.REF_ARRAY_BASE; import static org.jctools.util.UnsafeRefArrayAccess.REF_ELEMENT_SHIFT; +/** + * This is used for method substitution in the LinkedArray classes code generation. + */ final class LinkedArrayQueueUtil { private LinkedArrayQueueUtil() diff --git a/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java index 9cab9013..31508a40 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpmcArrayQueue.java @@ -50,7 +50,7 @@ public final long lvProducerIndex() return producerIndex; } - protected final boolean casProducerIndex(long expect, long newValue) + final boolean casProducerIndex(long expect, long newValue) { return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); } @@ -85,7 +85,7 @@ public final long lvConsumerIndex() return consumerIndex; } - protected final boolean casConsumerIndex(long expect, long newValue) + final boolean casConsumerIndex(long expect, long newValue) { return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); } diff --git a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java index 768305da..6c62d2f7 100755 --- a/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/MpscArrayQueue.java @@ -48,7 +48,7 @@ public final long lvProducerIndex() return producerIndex; } - protected final boolean casProducerIndex(long expect, long newValue) + final boolean casProducerIndex(long expect, long newValue) { return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); } @@ -79,12 +79,12 @@ abstract class MpscArrayQueueProducerLimitField extends MpscArrayQueueMidPad< this.producerLimit = capacity; } - protected final long lvProducerLimit() + final long lvProducerLimit() { return producerLimit; } - protected final void soProducerLimit(long newValue) + final void soProducerLimit(long newValue) { UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, newValue); } @@ -106,25 +106,25 @@ abstract class MpscArrayQueueConsumerIndexField extends MpscArrayQueueL2Pad xchgProducerNode(LinkedQueueNode newVal) Object oldVal; do { - oldVal = producerNode; + oldVal = lvProducerNode(); } while (!UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, oldVal, newVal)); return (LinkedQueueNode) oldVal; diff --git a/jctools-core/src/main/java/org/jctools/queues/SpmcArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/SpmcArrayQueue.java index 99ea1f83..1ecd035b 100644 --- a/jctools-core/src/main/java/org/jctools/queues/SpmcArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/SpmcArrayQueue.java @@ -35,22 +35,29 @@ abstract class SpmcArrayQueueProducerIndexField extends SpmcArrayQueueL1Pad extends SpmcArrayQueueProducerIndexField @@ -76,12 +83,13 @@ abstract class SpmcArrayQueueConsumerIndexField extends SpmcArrayQueueL2Pad s, final int limit) { final E[] buffer = this.buffer; final long mask = this.mask; - long producerIndex = this.producerIndex; + long producerIndex = this.lpProducerIndex(); for (int i = 0; i < limit; i++) { @@ -380,7 +388,7 @@ public void fill(final Supplier s, final WaitStrategy w, final ExitCondition { final E[] buffer = this.buffer; final long mask = this.mask; - long producerIndex = this.producerIndex; + long producerIndex = this.lpProducerIndex(); int counter = 0; while (e.keepRunning()) { diff --git a/jctools-core/src/main/java/org/jctools/queues/SpscArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/SpscArrayQueue.java index 88bb629f..aa3565ce 100644 --- a/jctools-core/src/main/java/org/jctools/queues/SpscArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/SpscArrayQueue.java @@ -21,7 +21,7 @@ abstract class SpscArrayQueueColdField extends ConcurrentCircularArrayQueue { public static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); - protected final int lookAheadStep; + final int lookAheadStep; SpscArrayQueueColdField(int capacity) { @@ -44,9 +44,9 @@ abstract class SpscArrayQueueL1Pad extends SpscArrayQueueColdField // $gen:ordered-fields abstract class SpscArrayQueueProducerIndexFields extends SpscArrayQueueL1Pad { - protected final static long P_INDEX_OFFSET = fieldOffset(SpscArrayQueueProducerIndexFields.class, "producerIndex"); + private final static long P_INDEX_OFFSET = fieldOffset(SpscArrayQueueProducerIndexFields.class, "producerIndex"); - protected long producerIndex; + private volatile long producerIndex; protected long producerLimit; SpscArrayQueueProducerIndexFields(int capacity) @@ -57,10 +57,15 @@ abstract class SpscArrayQueueProducerIndexFields extends SpscArrayQueueL1Pad< @Override public final long lvProducerIndex() { - return UNSAFE.getLongVolatile(this, P_INDEX_OFFSET); + return producerIndex; + } + + final long lpProducerIndex() + { + return UNSAFE.getLong(this, P_INDEX_OFFSET); } - protected final void soProducerIndex(final long newValue) + final void soProducerIndex(final long newValue) { UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue); } @@ -81,8 +86,9 @@ abstract class SpscArrayQueueL2Pad extends SpscArrayQueueProducerIndexFields< //$gen:ordered-fields abstract class SpscArrayQueueConsumerIndexField extends SpscArrayQueueL2Pad { - protected long consumerIndex; - protected final static long C_INDEX_OFFSET = fieldOffset(SpscArrayQueueConsumerIndexField.class, "consumerIndex"); + private final static long C_INDEX_OFFSET = fieldOffset(SpscArrayQueueConsumerIndexField.class, "consumerIndex"); + + private volatile long consumerIndex; SpscArrayQueueConsumerIndexField(int capacity) { @@ -93,8 +99,13 @@ public final long lvConsumerIndex() { return UNSAFE.getLongVolatile(this, C_INDEX_OFFSET); } - - protected final void soConsumerIndex(final long newValue) + + final long lpConsumerIndex() + { + return UNSAFE.getLong(this, C_INDEX_OFFSET); + } + + final void soConsumerIndex(final long newValue) { UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue); } @@ -150,7 +161,7 @@ public boolean offer(final E e) // local load of field to avoid repeated loads after volatile reads final E[] buffer = this.buffer; final long mask = this.mask; - final long producerIndex = this.producerIndex; + final long producerIndex = this.lpProducerIndex(); if (producerIndex >= producerLimit && !offerSlowPath(buffer, mask, producerIndex)) @@ -190,7 +201,7 @@ private boolean offerSlowPath(final E[] buffer, final long mask, final long prod @Override public E poll() { - final long consumerIndex = this.consumerIndex; + final long consumerIndex = this.lpConsumerIndex(); final long offset = calcElementOffset(consumerIndex); // local load of field to avoid repeated loads after volatile reads final E[] buffer = this.buffer; @@ -212,7 +223,7 @@ public E poll() @Override public E peek() { - return lvElement(buffer, calcElementOffset(consumerIndex)); + return lvElement(buffer, calcElementOffset(lpConsumerIndex())); } @Override @@ -250,7 +261,7 @@ public int drain(final Consumer c, final int limit) { final E[] buffer = this.buffer; final long mask = this.mask; - final long consumerIndex = this.consumerIndex; + final long consumerIndex = this.lpConsumerIndex(); for (int i = 0; i < limit; i++) { @@ -274,7 +285,7 @@ public int fill(final Supplier s, final int limit) final E[] buffer = this.buffer; final long mask = this.mask; final int lookAheadStep = this.lookAheadStep; - final long producerIndex = this.producerIndex; + final long producerIndex = this.lpProducerIndex(); for (int i = 0; i < limit; i++) { @@ -311,7 +322,7 @@ public void drain(final Consumer c, final WaitStrategy w, final ExitCondition { final E[] buffer = this.buffer; final long mask = this.mask; - long consumerIndex = this.consumerIndex; + long consumerIndex = this.lpConsumerIndex(); int counter = 0; while (exit.keepRunning()) @@ -340,7 +351,7 @@ public void fill(final Supplier s, final WaitStrategy w, final ExitCondition final E[] buffer = this.buffer; final long mask = this.mask; final int lookAheadStep = this.lookAheadStep; - long producerIndex = this.producerIndex; + long producerIndex = this.lpProducerIndex(); int counter = 0; while (e.keepRunning()) { diff --git a/jctools-core/src/main/java/org/jctools/queues/SpscLinkedQueue.java b/jctools-core/src/main/java/org/jctools/queues/SpscLinkedQueue.java index 805771d6..441d12ba 100644 --- a/jctools-core/src/main/java/org/jctools/queues/SpscLinkedQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/SpscLinkedQueue.java @@ -130,7 +130,7 @@ public int fill(Supplier s, int limit) @Override public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) { - LinkedQueueNode chaserNode = producerNode; + LinkedQueueNode chaserNode = lpProducerNode(); while (exit.keepRunning()) { for (int i = 0; i < 4096; i++) @@ -138,7 +138,7 @@ public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) final LinkedQueueNode nextNode = newNode(s.get()); chaserNode.soNext(nextNode); chaserNode = nextNode; - this.producerNode = chaserNode; + this.spProducerNode(chaserNode); } } } diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseLinkedAtomicQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseLinkedAtomicQueue.java index 8e22a707..b41d505f 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseLinkedAtomicQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseLinkedAtomicQueue.java @@ -13,7 +13,6 @@ */ package org.jctools.queues.atomic; -import java.lang.reflect.Field; import java.util.AbstractQueue; import java.util.Iterator; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -46,23 +45,23 @@ abstract class BaseLinkedAtomicQueueProducerNodeRef extends BaseLinkedAtomicQ private static final AtomicReferenceFieldUpdater P_NODE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(BaseLinkedAtomicQueueProducerNodeRef.class, LinkedQueueAtomicNode.class, "producerNode"); - protected volatile LinkedQueueAtomicNode producerNode; + private volatile LinkedQueueAtomicNode producerNode; - protected final void spProducerNode(LinkedQueueAtomicNode newValue) { + final void spProducerNode(LinkedQueueAtomicNode newValue) { P_NODE_UPDATER.lazySet(this, newValue); } @SuppressWarnings("unchecked") - protected final LinkedQueueAtomicNode lvProducerNode() { + final LinkedQueueAtomicNode lvProducerNode() { return producerNode; } @SuppressWarnings("unchecked") - protected final boolean casProducerNode(LinkedQueueAtomicNode expect, LinkedQueueAtomicNode newValue) { + final boolean casProducerNode(LinkedQueueAtomicNode expect, LinkedQueueAtomicNode newValue) { return P_NODE_UPDATER.compareAndSet(this, expect, newValue); } - protected final LinkedQueueAtomicNode lpProducerNode() { + final LinkedQueueAtomicNode lpProducerNode() { return producerNode; } @@ -90,18 +89,18 @@ abstract class BaseLinkedAtomicQueueConsumerNodeRef extends BaseLinkedAtomicQ private static final AtomicReferenceFieldUpdater C_NODE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(BaseLinkedAtomicQueueConsumerNodeRef.class, LinkedQueueAtomicNode.class, "consumerNode"); - protected volatile LinkedQueueAtomicNode consumerNode; + private volatile LinkedQueueAtomicNode consumerNode; - protected final void spConsumerNode(LinkedQueueAtomicNode newValue) { + final void spConsumerNode(LinkedQueueAtomicNode newValue) { C_NODE_UPDATER.lazySet(this, newValue); } @SuppressWarnings("unchecked") - protected final LinkedQueueAtomicNode lvConsumerNode() { + final LinkedQueueAtomicNode lvConsumerNode() { return consumerNode; } - protected final LinkedQueueAtomicNode lpConsumerNode() { + final LinkedQueueAtomicNode lpConsumerNode() { return consumerNode; } } @@ -246,7 +245,7 @@ public int drain(Consumer c) { @Override public int drain(Consumer c, int limit) { - LinkedQueueAtomicNode chaserNode = this.consumerNode; + LinkedQueueAtomicNode chaserNode = this.lpConsumerNode(); for (int i = 0; i < limit; i++) { final LinkedQueueAtomicNode nextNode = chaserNode.lvNext(); if (nextNode == null) { @@ -262,7 +261,7 @@ public int drain(Consumer c, int limit) { @Override public void drain(Consumer c, WaitStrategy wait, ExitCondition exit) { - LinkedQueueAtomicNode chaserNode = this.consumerNode; + LinkedQueueAtomicNode chaserNode = this.lpConsumerNode(); int idleCounter = 0; while (exit.keepRunning()) { for (int i = 0; i < 4096; i++) { diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java index cfc49a2c..7edf95e8 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseMpscLinkedAtomicArrayQueue.java @@ -17,7 +17,6 @@ import org.jctools.util.PortableJvmInfo; import org.jctools.util.Pow2; import org.jctools.util.RangeUtil; -import java.lang.reflect.Field; import java.util.AbstractQueue; import java.util.Iterator; import static org.jctools.queues.atomic.LinkedAtomicArrayQueueUtil.length; @@ -52,7 +51,7 @@ abstract class BaseMpscLinkedAtomicArrayQueueProducerFields extends BaseMpscL private static final AtomicLongFieldUpdater P_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicArrayQueueProducerFields.class, "producerIndex"); - protected volatile long producerIndex; + private volatile long producerIndex; @Override public final long lvProducerIndex() { @@ -87,17 +86,21 @@ abstract class BaseMpscLinkedAtomicArrayQueueConsumerFields extends BaseMpscL private static final AtomicLongFieldUpdater C_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseMpscLinkedAtomicArrayQueueConsumerFields.class, "consumerIndex"); + private volatile long consumerIndex; + protected long consumerMask; protected AtomicReferenceArray consumerBuffer; - protected volatile long consumerIndex; - @Override public final long lvConsumerIndex() { return consumerIndex; } + final long lpConsumerIndex() { + return consumerIndex; + } + final void soConsumerIndex(long newValue) { C_INDEX_UPDATER.lazySet(this, newValue); } @@ -281,7 +284,7 @@ public boolean offer(final E e) { @Override public E poll() { final AtomicReferenceArray buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final int offset = modifiedCalcElementOffset(index, mask); // LoadLoad @@ -316,7 +319,7 @@ public E poll() { @Override public E peek() { final AtomicReferenceArray buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final int offset = modifiedCalcElementOffset(index, mask); // LoadLoad @@ -429,7 +432,7 @@ public boolean relaxedOffer(E e) { @Override public E relaxedPoll() { final AtomicReferenceArray buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final int offset = modifiedCalcElementOffset(index, mask); // LoadLoad @@ -450,7 +453,7 @@ public E relaxedPoll() { @Override public E relaxedPeek() { final AtomicReferenceArray buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final int offset = modifiedCalcElementOffset(index, mask); // LoadLoad diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseSpscLinkedAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseSpscLinkedAtomicArrayQueue.java index b3889418..f3d672a7 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/BaseSpscLinkedAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/BaseSpscLinkedAtomicArrayQueue.java @@ -15,7 +15,6 @@ import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue; import org.jctools.util.PortableJvmInfo; -import java.lang.reflect.Field; import java.util.AbstractQueue; import java.util.Iterator; import static org.jctools.queues.atomic.LinkedAtomicArrayQueueUtil.length; @@ -61,16 +60,20 @@ abstract class BaseSpscLinkedAtomicArrayQueueConsumerField extends BaseSpscLi private static final AtomicLongFieldUpdater C_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseSpscLinkedAtomicArrayQueueConsumerField.class, "consumerIndex"); - protected volatile long consumerIndex; - - final void soConsumerIndex(long newValue) { - C_INDEX_UPDATER.lazySet(this, newValue); - } + private volatile long consumerIndex; @Override public final long lvConsumerIndex() { return consumerIndex; } + + final long lpConsumerIndex() { + return consumerIndex; + } + + final void soConsumerIndex(long newValue) { + C_INDEX_UPDATER.lazySet(this, newValue); + } } /** @@ -92,14 +95,18 @@ abstract class BaseSpscLinkedAtomicArrayQueueProducerFields extends BaseSpscL private static final AtomicLongFieldUpdater P_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(BaseSpscLinkedAtomicArrayQueueProducerFields.class, "producerIndex"); - protected volatile long producerIndex; + private volatile long producerIndex; + + @Override + public final long lvProducerIndex() { + return producerIndex; + } final void soProducerIndex(long newValue) { P_INDEX_UPDATER.lazySet(this, newValue); } - @Override - public final long lvProducerIndex() { + final long lpProducerIndex() { return producerIndex; } } @@ -215,7 +222,7 @@ public int fill(Supplier s, int limit) { for (int i = 0; i < limit; i++) { // local load of field to avoid repeated loads after volatile reads final AtomicReferenceArray buffer = producerBuffer; - final long index = producerIndex; + final long index = lpProducerIndex(); final long mask = producerMask; final int offset = calcElementOffset(index, mask); // expected hot path @@ -261,7 +268,7 @@ public boolean offer(final E e) { } // local load of field to avoid repeated loads after volatile reads final AtomicReferenceArray buffer = producerBuffer; - final long index = producerIndex; + final long index = lpProducerIndex(); final long mask = producerMask; final int offset = calcElementOffset(index, mask); // expected hot path @@ -284,7 +291,7 @@ public boolean offer(final E e) { public E poll() { // local load of field to avoid repeated loads after volatile reads final AtomicReferenceArray buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final int offset = calcElementOffset(index, mask); // LoadLoad @@ -310,7 +317,7 @@ public E poll() { @Override public E peek() { final AtomicReferenceArray buffer = consumerBuffer; - final long index = consumerIndex; + final long index = lpConsumerIndex(); final long mask = consumerMask; final int offset = calcElementOffset(index, mask); // LoadLoad diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/MpmcAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/MpmcAtomicArrayQueue.java index d19a372c..9bbe1ae0 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/MpmcAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/MpmcAtomicArrayQueue.java @@ -53,7 +53,7 @@ public final long lvProducerIndex() { return producerIndex; } - protected final boolean casProducerIndex(long expect, long newValue) { + final boolean casProducerIndex(long expect, long newValue) { return P_INDEX_UPDATER.compareAndSet(this, expect, newValue); } } @@ -92,7 +92,7 @@ public final long lvConsumerIndex() { return consumerIndex; } - protected final boolean casConsumerIndex(long expect, long newValue) { + final boolean casConsumerIndex(long expect, long newValue) { return C_INDEX_UPDATER.compareAndSet(this, expect, newValue); } } diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java index 626cb19a..0bb5cc32 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscAtomicArrayQueue.java @@ -52,7 +52,7 @@ public final long lvProducerIndex() { return producerIndex; } - protected final boolean casProducerIndex(long expect, long newValue) { + final boolean casProducerIndex(long expect, long newValue) { return P_INDEX_UPDATER.compareAndSet(this, expect, newValue); } } @@ -88,11 +88,11 @@ abstract class MpscAtomicArrayQueueProducerLimitField extends MpscAtomicArray this.producerLimit = capacity; } - protected final long lvProducerLimit() { + final long lvProducerLimit() { return producerLimit; } - protected final void soProducerLimit(long newValue) { + final void soProducerLimit(long newValue) { P_LIMIT_UPDATER.lazySet(this, newValue); } } @@ -120,22 +120,22 @@ abstract class MpscAtomicArrayQueueConsumerIndexField extends MpscAtomicArray private static final AtomicLongFieldUpdater C_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(MpscAtomicArrayQueueConsumerIndexField.class, "consumerIndex"); - protected volatile long consumerIndex; + private volatile long consumerIndex; MpscAtomicArrayQueueConsumerIndexField(int capacity) { super(capacity); } - protected final long lpConsumerIndex() { + @Override + public final long lvConsumerIndex() { return consumerIndex; } - @Override - public final long lvConsumerIndex() { + final long lpConsumerIndex() { return consumerIndex; } - protected void soConsumerIndex(long newValue) { + final void soConsumerIndex(long newValue) { C_INDEX_UPDATER.lazySet(this, newValue); } } diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscLinkedAtomicQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscLinkedAtomicQueue.java index 4a8e9d93..3fdf98cb 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/MpscLinkedAtomicQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/MpscLinkedAtomicQueue.java @@ -42,7 +42,7 @@ * @param the type of elements in this queue * @author nitsanw */ -public class MpscLinkedAtomicQueue extends BaseLinkedAtomicQueue { +public final class MpscLinkedAtomicQueue extends BaseLinkedAtomicQueue { public MpscLinkedAtomicQueue() { LinkedQueueAtomicNode node = newNode(); diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/SpmcAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/SpmcAtomicArrayQueue.java index 174391bb..d88be8a9 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/SpmcAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/SpmcAtomicArrayQueue.java @@ -41,18 +41,23 @@ abstract class SpmcAtomicArrayQueueProducerIndexField extends SpmcAtomicArray private static final AtomicLongFieldUpdater P_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(SpmcAtomicArrayQueueProducerIndexField.class, "producerIndex"); - protected volatile long producerIndex; + private volatile long producerIndex; + SpmcAtomicArrayQueueProducerIndexField(int capacity) { + super(capacity); + } + + @Override public final long lvProducerIndex() { return producerIndex; } - protected final void soProducerIndex(long newValue) { - P_INDEX_UPDATER.lazySet(this, newValue); + final long lpProducerIndex() { + return producerIndex; } - SpmcAtomicArrayQueueProducerIndexField(int capacity) { - super(capacity); + final void soProducerIndex(long newValue) { + P_INDEX_UPDATER.lazySet(this, newValue); } } @@ -85,11 +90,12 @@ abstract class SpmcAtomicArrayQueueConsumerIndexField extends SpmcAtomicArray super(capacity); } + @Override public final long lvConsumerIndex() { return consumerIndex; } - protected final boolean casConsumerIndex(long expect, long newValue) { + final boolean casConsumerIndex(long expect, long newValue) { return C_INDEX_UPDATER.compareAndSet(this, expect, newValue); } } @@ -173,7 +179,7 @@ public boolean offer(final E e) { } else { // spin wait for slot to clear, buggers wait freedom while (null != lvElement(buffer, offset)) { - ; + // BURN } } } @@ -240,7 +246,7 @@ public boolean relaxedOffer(E e) { } final AtomicReferenceArray buffer = this.buffer; final int mask = this.mask; - final long producerIndex = lvProducerIndex(); + final long producerIndex = lpProducerIndex(); final int offset = calcElementOffset(producerIndex, mask); if (null != lvElement(buffer, offset)) { return false; @@ -317,7 +323,7 @@ public int drain(final Consumer c, final int limit) { public int fill(final Supplier s, final int limit) { final AtomicReferenceArray buffer = this.buffer; final int mask = this.mask; - long producerIndex = this.producerIndex; + long producerIndex = this.lpProducerIndex(); for (int i = 0; i < limit; i++) { final int offset = calcElementOffset(producerIndex, mask); if (null != lvElement(buffer, offset)) { @@ -348,7 +354,7 @@ public void drain(final Consumer c, final WaitStrategy w, final ExitCondition public void fill(final Supplier s, final WaitStrategy w, final ExitCondition e) { final AtomicReferenceArray buffer = this.buffer; final int mask = this.mask; - long producerIndex = this.producerIndex; + long producerIndex = this.lpProducerIndex(); int counter = 0; while (e.keepRunning()) { for (int i = 0; i < 4096; i++) { diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java index 3c0ef2ef..ea9a0989 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java @@ -25,7 +25,7 @@ abstract class SpscAtomicArrayQueueColdField extends AtomicReferenceArrayQueu public static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); - protected final int lookAheadStep; + final int lookAheadStep; SpscAtomicArrayQueueColdField(int capacity) { super(capacity); @@ -56,7 +56,7 @@ abstract class SpscAtomicArrayQueueProducerIndexFields extends SpscAtomicArra private static final AtomicLongFieldUpdater P_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(SpscAtomicArrayQueueProducerIndexFields.class, "producerIndex"); - protected volatile long producerIndex; + private volatile long producerIndex; protected long producerLimit; @@ -69,7 +69,11 @@ public final long lvProducerIndex() { return producerIndex; } - protected final void soProducerIndex(final long newValue) { + final long lpProducerIndex() { + return producerIndex; + } + + final void soProducerIndex(final long newValue) { P_INDEX_UPDATER.lazySet(this, newValue); } } @@ -97,7 +101,7 @@ abstract class SpscAtomicArrayQueueConsumerIndexField extends SpscAtomicArray private static final AtomicLongFieldUpdater C_INDEX_UPDATER = AtomicLongFieldUpdater.newUpdater(SpscAtomicArrayQueueConsumerIndexField.class, "consumerIndex"); - protected volatile long consumerIndex; + private volatile long consumerIndex; SpscAtomicArrayQueueConsumerIndexField(int capacity) { super(capacity); @@ -107,7 +111,11 @@ public final long lvConsumerIndex() { return consumerIndex; } - protected final void soConsumerIndex(final long newValue) { + final long lpConsumerIndex() { + return consumerIndex; + } + + final void soConsumerIndex(final long newValue) { C_INDEX_UPDATER.lazySet(this, newValue); } } @@ -164,7 +172,7 @@ public boolean offer(final E e) { // local load of field to avoid repeated loads after volatile reads final AtomicReferenceArray buffer = this.buffer; final int mask = this.mask; - final long producerIndex = this.producerIndex; + final long producerIndex = this.lpProducerIndex(); if (producerIndex >= producerLimit && !offerSlowPath(buffer, mask, producerIndex)) { return false; } @@ -197,7 +205,7 @@ private boolean offerSlowPath(final AtomicReferenceArray buffer, final int ma */ @Override public E poll() { - final long consumerIndex = this.consumerIndex; + final long consumerIndex = this.lpConsumerIndex(); final int offset = calcElementOffset(consumerIndex); // local load of field to avoid repeated loads after volatile reads final AtomicReferenceArray buffer = this.buffer; @@ -220,7 +228,7 @@ public E poll() { */ @Override public E peek() { - return lvElement(buffer, calcElementOffset(consumerIndex)); + return lvElement(buffer, calcElementOffset(lpConsumerIndex())); } @Override @@ -252,7 +260,7 @@ public int fill(final Supplier s) { public int drain(final Consumer c, final int limit) { final AtomicReferenceArray buffer = this.buffer; final int mask = this.mask; - final long consumerIndex = this.consumerIndex; + final long consumerIndex = this.lpConsumerIndex(); for (int i = 0; i < limit; i++) { final long index = consumerIndex + i; final int offset = calcElementOffset(index, mask); @@ -275,7 +283,7 @@ public int fill(final Supplier s, final int limit) { final AtomicReferenceArray buffer = this.buffer; final int mask = this.mask; final int lookAheadStep = this.lookAheadStep; - final long producerIndex = this.producerIndex; + final long producerIndex = this.lpProducerIndex(); for (int i = 0; i < limit; i++) { final long index = producerIndex + i; final int lookAheadElementOffset = calcElementOffset(index + lookAheadStep, mask); @@ -308,7 +316,7 @@ public int fill(final Supplier s, final int limit) { public void drain(final Consumer c, final WaitStrategy w, final ExitCondition exit) { final AtomicReferenceArray buffer = this.buffer; final int mask = this.mask; - long consumerIndex = this.consumerIndex; + long consumerIndex = this.lpConsumerIndex(); int counter = 0; while (exit.keepRunning()) { for (int i = 0; i < 4096; i++) { @@ -335,7 +343,7 @@ public void fill(final Supplier s, final WaitStrategy w, final ExitCondition final AtomicReferenceArray buffer = this.buffer; final int mask = this.mask; final int lookAheadStep = this.lookAheadStep; - long producerIndex = this.producerIndex; + long producerIndex = this.lpProducerIndex(); int counter = 0; while (e.keepRunning()) { final int lookAheadElementOffset = calcElementOffset(producerIndex + lookAheadStep, mask); diff --git a/jctools-core/src/main/java/org/jctools/queues/atomic/SpscLinkedAtomicQueue.java b/jctools-core/src/main/java/org/jctools/queues/atomic/SpscLinkedAtomicQueue.java index cc7aac87..293512a3 100644 --- a/jctools-core/src/main/java/org/jctools/queues/atomic/SpscLinkedAtomicQueue.java +++ b/jctools-core/src/main/java/org/jctools/queues/atomic/SpscLinkedAtomicQueue.java @@ -133,13 +133,13 @@ public int fill(Supplier s, int limit) { @Override public void fill(Supplier s, WaitStrategy wait, ExitCondition exit) { - LinkedQueueAtomicNode chaserNode = producerNode; + LinkedQueueAtomicNode chaserNode = lpProducerNode(); while (exit.keepRunning()) { for (int i = 0; i < 4096; i++) { final LinkedQueueAtomicNode nextNode = newNode(s.get()); chaserNode.soNext(nextNode); chaserNode = nextNode; - this.producerNode = chaserNode; + this.spProducerNode(chaserNode); } } } diff --git a/jctools-core/src/test/java/org/jctools/queues/SpscArrayQueueTest.java b/jctools-core/src/test/java/org/jctools/queues/SpscArrayQueueTest.java index ae041340..6b8965c2 100644 --- a/jctools-core/src/test/java/org/jctools/queues/SpscArrayQueueTest.java +++ b/jctools-core/src/test/java/org/jctools/queues/SpscArrayQueueTest.java @@ -15,8 +15,8 @@ public void shouldWorkAfterWrap() // Arrange final SpscArrayQueue q = new SpscArrayQueue(1024); // starting point for empty queue at max long, next offer will wrap the producerIndex - q.consumerIndex = Long.MAX_VALUE; - q.producerIndex = Long.MAX_VALUE; + q.soConsumerIndex(Long.MAX_VALUE); + q.soProducerIndex(Long.MAX_VALUE); q.producerLimit = Long.MAX_VALUE; // valid starting point assertThat(q, emptyAndZeroSize()); diff --git a/jctools-core/src/test/java/org/jctools/queues/atomic/SpscAtomicArrayQueueTest.java b/jctools-core/src/test/java/org/jctools/queues/atomic/SpscAtomicArrayQueueTest.java index 98e5265d..600d7351 100644 --- a/jctools-core/src/test/java/org/jctools/queues/atomic/SpscAtomicArrayQueueTest.java +++ b/jctools-core/src/test/java/org/jctools/queues/atomic/SpscAtomicArrayQueueTest.java @@ -28,8 +28,8 @@ public void shouldWorkAfterWrap() // Arrange final SpscAtomicArrayQueue q = new SpscAtomicArrayQueue(1024); // starting point for empty queue at max long, next offer will wrap the producerIndex - q.consumerIndex = Long.MAX_VALUE; - q.producerIndex = Long.MAX_VALUE; + q.soConsumerIndex(Long.MAX_VALUE); + q.soProducerIndex(Long.MAX_VALUE); q.producerLimit = Long.MAX_VALUE; // valid starting point assertThat(q, emptyAndZeroSize());