Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove some redundant fence #307

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -351,7 +351,7 @@ public E poll()
return newBufferPoll(nextBuffer, index);
}

soRefElement(buffer, offset, null); // release element null
spRefElement(buffer, offset, null); // release element null
soConsumerIndex(index + 2); // release cIndex
return (E) e;
}
Expand Down Expand Up @@ -436,8 +436,9 @@ else if (casProducerIndex(pIndex, pIndex + 1))
@SuppressWarnings("unchecked")
private E[] nextBuffer(final E[] buffer, final long mask)
{
// this access after loadVolatile JUMP
final long offset = nextArrayOffset(mask);
final E[] nextBuffer = (E[]) lvRefElement(buffer, offset);
final E[] nextBuffer = (E[]) lpRefElement(buffer, offset);
consumerBuffer = nextBuffer;
consumerMask = (length(nextBuffer) - 2) << 1;
soRefElement(buffer, offset, BUFFER_CONSUMED);
Expand All @@ -451,21 +452,23 @@ private static long nextArrayOffset(long mask)

private E newBufferPoll(E[] nextBuffer, long index)
{
// this access after loadVolatile JUMP
final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
final E n = lvRefElement(nextBuffer, offset);
final E n = lpRefElement(nextBuffer, offset);
if (n == null)
{
throw new IllegalStateException("new buffer must have at least one element");
}
soRefElement(nextBuffer, offset, null);
spRefElement(nextBuffer, offset, null);
soConsumerIndex(index + 2);
return n;
}

private E newBufferPeek(E[] nextBuffer, long index)
{
// this access after loadVolatile JUMP
final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
final E n = lvRefElement(nextBuffer, offset);
final E n = lpRefElement(nextBuffer, offset);
if (null == n)
{
throw new IllegalStateException("new buffer must have at least one element");
Expand Down Expand Up @@ -513,7 +516,7 @@ public E relaxedPoll()
final E[] nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}
soRefElement(buffer, offset, null);
spRefElement(buffer, offset, null);
soConsumerIndex(index + 2);
return (E) e;
}
Expand Down Expand Up @@ -753,7 +756,7 @@ private E getNext()

private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s)
{
assert (e != null && s == null) || (e == null || s != null);
assert (e != null && s == null) || (e == null && s != null);
int newBufferLength = getNextBufferSize(oldBuffer);
final E[] newBuffer;
try
Expand All @@ -774,8 +777,9 @@ private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s
final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);

soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked
// Plain Mode: unreachable until soProducerIndex
spRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
spRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked

// ASSERT code
final long cIndex = lvConsumerIndex();
Expand Down
Expand Up @@ -172,17 +172,18 @@ public long currentConsumerIndex()
return lvConsumerIndex();
}

protected final void soNext(E[] curr, E[] next)
protected final void spNext(E[] curr, E[] next)
{
long offset = nextArrayOffset(curr);
soRefElement(curr, offset, next);
spRefElement(curr, offset, next);
}

@SuppressWarnings("unchecked")
protected final E[] lvNextArrayAndUnlink(E[] curr)
protected final E[] lpNextArrayAndUnlink(E[] curr)
{
// this access after loadVolatile JUMP
final long offset = nextArrayOffset(curr);
final E[] nextBuffer = (E[]) lvRefElement(curr, offset);
final E[] nextBuffer = (E[]) lpRefElement(curr, offset);
// prevent GC nepotism
soRefElement(curr, offset, null);
return nextBuffer;
Expand Down Expand Up @@ -374,9 +375,11 @@ final void linkOldToNew(
final E[] newBuffer, final long offsetInNew,
final E e)
{
soRefElement(newBuffer, offsetInNew, e);
// Plain Mode: unreachable until JUMP is published
spRefElement(newBuffer, offsetInNew, e);
// link to next buffer and add next indicator as element of old buffer
soNext(oldBuffer, newBuffer);
spNext(oldBuffer, newBuffer);

soRefElement(oldBuffer, offset, JUMP);
// index is visible after elements (isEmpty/poll ordering)
soProducerIndex(currIndex + 1);// this ensures atomic write of long on 32bit platforms
Expand All @@ -390,22 +393,24 @@ final void writeToQueue(final E[] buffer, final E e, final long index, final lon

private E newBufferPeek(final E[] buffer, final long index)
{
E[] nextBuffer = lvNextArrayAndUnlink(buffer);
E[] nextBuffer = lpNextArrayAndUnlink(buffer);
consumerBuffer = nextBuffer;
final long mask = length(nextBuffer) - 2;
consumerMask = mask;
// this access after loadVolatile JUMP
final long offset = calcCircularRefElementOffset(index, mask);
return lvRefElement(nextBuffer, offset);
return lpRefElement(nextBuffer, offset);
}

private E newBufferPoll(final E[] buffer, final long index)
{
E[] nextBuffer = lvNextArrayAndUnlink(buffer);
E[] nextBuffer = lpNextArrayAndUnlink(buffer);
consumerBuffer = nextBuffer;
final long mask = length(nextBuffer) - 2;
consumerMask = mask;
// this access after loadVolatile JUMP
final long offset = calcCircularRefElementOffset(index, mask);
final E n = lvRefElement(nextBuffer, offset);
final E n = lpRefElement(nextBuffer, offset);
if (null == n)
{
throw new IllegalStateException("new buffer must have at least one element");
Expand Down
Expand Up @@ -80,6 +80,10 @@ final E lvElement(int index)
return lvRefElement(buffer, calcRefElementOffset(index));
}

final void spElement(int index, E e) {
spRefElement(buffer, calcRefElementOffset(index), e);
}

final E spinForElement(int index, boolean isNull)
{
E[] buffer = this.buffer;
Expand Down
Expand Up @@ -137,7 +137,8 @@ public E poll()
e = cChunk.spinForElement(ciChunkOffset, false);
}
}
cChunk.soElement(ciChunkOffset, null);
// consumer free the chunk happens-before producer reuse the chunk
cChunk.spElement(ciChunkOffset, null);
soConsumerIndex(cIndex + 1);
return e;
}
Expand Down Expand Up @@ -212,7 +213,8 @@ public E relaxedPoll()
}
}

cChunk.soElement(ciChunkOffset, null);
// consumer free the chunk happens-before producer reuse the chunk
cChunk.spElement(ciChunkOffset, null);
soConsumerIndex(cIndex + 1);
return e;
}
Expand Down Expand Up @@ -303,7 +305,8 @@ public int drain(Consumer<E> c, int limit)
return i;
}
}
cChunk.soElement(consumerOffset, null);
// consumer free the chunk happens-before producer reuse the chunk
cChunk.spElement(consumerOffset, null);
final long nextConsumerIndex = cIndex + 1;
soConsumerIndex(nextConsumerIndex);
c.accept(e);
Expand Down
Expand Up @@ -15,7 +15,7 @@ static <E> E lpRefElement(AtomicReferenceArray<E> buffer, int offset)
return buffer.get(offset); // no weaker form available
}

static <E> void spRefElement(AtomicReferenceArray<E> buffer, int offset, E value)
static void spRefElement(AtomicReferenceArray buffer, int offset, Object value)
{
buffer.lazySet(offset, value); // no weaker form available
}
Expand Down
Expand Up @@ -428,7 +428,7 @@ public E poll() {
return newBufferPoll(nextBuffer, index);
}
// release element null
soRefElement(buffer, offset, null);
spRefElement(buffer, offset, null);
// release cIndex
soConsumerIndex(index + 2);
return (E) e;
Expand Down Expand Up @@ -494,8 +494,9 @@ private int offerSlowPath(long mask, long pIndex, long producerLimit) {

@SuppressWarnings("unchecked")
private AtomicReferenceArray<E> nextBuffer(final AtomicReferenceArray<E> buffer, final long mask) {
// this access after loadVolatile JUMP
final int offset = nextArrayOffset(mask);
final AtomicReferenceArray<E> nextBuffer = (AtomicReferenceArray<E>) lvRefElement(buffer, offset);
final AtomicReferenceArray<E> nextBuffer = (AtomicReferenceArray<E>) lpRefElement(buffer, offset);
consumerBuffer = nextBuffer;
consumerMask = (length(nextBuffer) - 2) << 1;
soRefElement(buffer, offset, BUFFER_CONSUMED);
Expand All @@ -507,19 +508,21 @@ private static int nextArrayOffset(long mask) {
}

private E newBufferPoll(AtomicReferenceArray<E> nextBuffer, long index) {
// this access after loadVolatile JUMP
final int offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
final E n = lvRefElement(nextBuffer, offset);
final E n = lpRefElement(nextBuffer, offset);
if (n == null) {
throw new IllegalStateException("new buffer must have at least one element");
}
soRefElement(nextBuffer, offset, null);
spRefElement(nextBuffer, offset, null);
soConsumerIndex(index + 2);
return n;
}

private E newBufferPeek(AtomicReferenceArray<E> nextBuffer, long index) {
// this access after loadVolatile JUMP
final int offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
final E n = lvRefElement(nextBuffer, offset);
final E n = lpRefElement(nextBuffer, offset);
if (null == n) {
throw new IllegalStateException("new buffer must have at least one element");
}
Expand Down Expand Up @@ -559,7 +562,7 @@ public E relaxedPoll() {
final AtomicReferenceArray<E> nextBuffer = nextBuffer(buffer, mask);
return newBufferPoll(nextBuffer, index);
}
soRefElement(buffer, offset, null);
spRefElement(buffer, offset, null);
soConsumerIndex(index + 2);
return (E) e;
}
Expand Down Expand Up @@ -766,7 +769,7 @@ private E getNext() {
}

private void resize(long oldMask, AtomicReferenceArray<E> oldBuffer, long pIndex, E e, Supplier<E> s) {
assert (e != null && s == null) || (e == null || s != null);
assert (e != null && s == null) || (e == null && s != null);
int newBufferLength = getNextBufferSize(oldBuffer);
final AtomicReferenceArray<E> newBuffer;
try {
Expand All @@ -781,10 +784,11 @@ private void resize(long oldMask, AtomicReferenceArray<E> oldBuffer, long pIndex
producerMask = newMask;
final int offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
final int offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
// Plain Mode: unreachable until soProducerIndex
// element in new array
soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
spRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
// buffer linked
soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);
spRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);
// ASSERT code
final long cIndex = lvConsumerIndex();
final long availableInQueue = availableInQueue(pIndex, cIndex);
Expand Down
Expand Up @@ -242,15 +242,16 @@ public long currentConsumerIndex() {
return lvConsumerIndex();
}

protected final void soNext(AtomicReferenceArray<E> curr, AtomicReferenceArray<E> next) {
protected final void spNext(AtomicReferenceArray<E> curr, AtomicReferenceArray<E> next) {
int offset = nextArrayOffset(curr);
soRefElement(curr, offset, next);
spRefElement(curr, offset, next);
}

@SuppressWarnings("unchecked")
protected final AtomicReferenceArray<E> lvNextArrayAndUnlink(AtomicReferenceArray<E> curr) {
protected final AtomicReferenceArray<E> lpNextArrayAndUnlink(AtomicReferenceArray<E> curr) {
// this access after loadVolatile JUMP
final int offset = nextArrayOffset(curr);
final AtomicReferenceArray<E> nextBuffer = (AtomicReferenceArray<E>) lvRefElement(curr, offset);
final AtomicReferenceArray<E> nextBuffer = (AtomicReferenceArray<E>) lpRefElement(curr, offset);
// prevent GC nepotism
soRefElement(curr, offset, null);
return nextBuffer;
Expand Down Expand Up @@ -404,9 +405,10 @@ public E peek() {
}

final void linkOldToNew(final long currIndex, final AtomicReferenceArray<E> oldBuffer, final int offset, final AtomicReferenceArray<E> newBuffer, final int offsetInNew, final E e) {
soRefElement(newBuffer, offsetInNew, e);
// Plain Mode: unreachable until JUMP is published
spRefElement(newBuffer, offsetInNew, e);
// link to next buffer and add next indicator as element of old buffer
soNext(oldBuffer, newBuffer);
spNext(oldBuffer, newBuffer);
soRefElement(oldBuffer, offset, JUMP);
// index is visible after elements (isEmpty/poll ordering)
// this ensures atomic write of long on 32bit platforms
Expand All @@ -420,21 +422,23 @@ final void writeToQueue(final AtomicReferenceArray<E> buffer, final E e, final l
}

private E newBufferPeek(final AtomicReferenceArray<E> buffer, final long index) {
AtomicReferenceArray<E> nextBuffer = lvNextArrayAndUnlink(buffer);
AtomicReferenceArray<E> nextBuffer = lpNextArrayAndUnlink(buffer);
consumerBuffer = nextBuffer;
final long mask = length(nextBuffer) - 2;
consumerMask = mask;
// this access after loadVolatile JUMP
final int offset = calcCircularRefElementOffset(index, mask);
return lvRefElement(nextBuffer, offset);
return lpRefElement(nextBuffer, offset);
}

private E newBufferPoll(final AtomicReferenceArray<E> buffer, final long index) {
AtomicReferenceArray<E> nextBuffer = lvNextArrayAndUnlink(buffer);
AtomicReferenceArray<E> nextBuffer = lpNextArrayAndUnlink(buffer);
consumerBuffer = nextBuffer;
final long mask = length(nextBuffer) - 2;
consumerMask = mask;
// this access after loadVolatile JUMP
final int offset = calcCircularRefElementOffset(index, mask);
final E n = lvRefElement(nextBuffer, offset);
final E n = lpRefElement(nextBuffer, offset);
if (null == n) {
throw new IllegalStateException("new buffer must have at least one element");
} else {
Expand Down