Skip to content

Commit

Permalink
Refactor MpscBlocking::offerBelow + bug fix + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsanw committed Aug 6, 2020
1 parent 5dc705b commit 8f268d5
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 29 deletions.
Expand Up @@ -226,6 +226,7 @@ public boolean offerIfBelowThreshold(final E e, int threshold)
{
throw new NullPointerException();
}

final long mask = this.mask;
final long capacity = mask + 1;

Expand Down Expand Up @@ -527,7 +528,7 @@ public int fill(Supplier<E> s, int limit)
final long capacity = mask + 1;
long producerLimit = lvProducerLimit();
long pIndex;
int actualLimit = 0;
int actualLimit;
do
{
pIndex = lvProducerIndex();
Expand Down
Expand Up @@ -310,6 +310,7 @@ public boolean offerIfBelowThreshold(final E e, int threshold)

final long mask = this.producerMask;
final long capacity = mask + 2;
threshold = threshold << 1;
final E[] buffer = this.producerBuffer;
long pIndex;
while (true)
Expand All @@ -329,23 +330,16 @@ public boolean offerIfBelowThreshold(final E e, int threshold)
// Use producer limit to save a read of the more rapidly mutated consumer index.
// Assumption: queue is usually empty or near empty

final long available = (producerLimit - pIndex) >> 1;
// available is also << 1
final long available = producerLimit - pIndex;
// sizeEstimate <= size
final long sizeEstimate = capacity - available;

if (sizeEstimate >= threshold ||
// producerLimit check allows for threshold >= capacity
producerLimit <= pIndex)
{
final long cIndex = lvConsumerIndex();
final long size = (pIndex - cIndex) >> 1;

if (size >= threshold)
{
return false;
}

if (!recalculateProducerLimit(mask, pIndex, producerLimit, cIndex))
if (!recalculateProducerLimit(pIndex, producerLimit, lvConsumerIndex(), capacity, threshold))
{
return false;
}
Expand Down Expand Up @@ -449,26 +443,19 @@ private boolean offerAndWakeup(E[] buffer, long mask, long pIndex, E e)

private boolean recalculateProducerLimit(long mask, long pIndex, long producerLimit)
{
return recalculateProducerLimit(mask, pIndex, producerLimit, lvConsumerIndex());
return recalculateProducerLimit(pIndex, producerLimit, lvConsumerIndex(), mask + 2, mask + 2);
}

private boolean recalculateProducerLimit(long mask, long pIndex, long producerLimit, long cIndex)
private boolean recalculateProducerLimit(long pIndex, long producerLimit, long cIndex, long bufferCapacity, long threshold)
{
final long bufferCapacity = mask + 2;

// try to update the limit with our new found knowledge on cIndex
if (cIndex + bufferCapacity > pIndex)
{
casProducerLimit(producerLimit, cIndex + bufferCapacity);
}
// full and cannot grow
else if (pIndex - cIndex == bufferCapacity)
{
// offer should return false;
return false;
}
else
throw new IllegalStateException();
return true;
// full and cannot grow, or hit threshold
long size = pIndex - cIndex;
return size < threshold && size < bufferCapacity;
}

private void wakeupConsumer()
Expand Down Expand Up @@ -753,8 +740,7 @@ public E relaxedPeek()
final long mask = consumerMask;

final long offset = modifiedCalcCircularRefElementOffset(index, mask);
E e = lvRefElement(buffer, offset);
return e;
return lvRefElement(buffer, offset);
}

@Override
Expand All @@ -768,13 +754,12 @@ public int fill(Supplier<E> s, int limit)
return 0;

final long mask = this.producerMask;
final E[] buffer = this.producerBuffer;

long pIndex;
int claimedSlots;
boolean wakeup = false;
long batchIndex = 0;
final long shiftedBatchSize = 2l * limit;
final long shiftedBatchSize = 2L * limit;

while (true)
{
Expand Down Expand Up @@ -818,10 +803,11 @@ public int fill(Supplier<E> s, int limit)
}
claimedSlots = (int) ((batchIndex - pIndex) / 2);

final E[] buffer = this.producerBuffer;
// first element offset might be a wakeup, so peeled from loop
for (int i = 0; i < claimedSlots; i++)
{
long offset = modifiedCalcCircularRefElementOffset(pIndex + 2l * i, mask);
long offset = modifiedCalcCircularRefElementOffset(pIndex + 2L * i, mask);
soRefElement(buffer, offset, s.get());
}

Expand Down
Expand Up @@ -9,6 +9,7 @@

import org.jctools.util.TestUtil;
import org.jctools.util.TestUtil.Val;
import org.junit.Assert;
import org.junit.Test;

import static java.util.concurrent.TimeUnit.*;
Expand Down Expand Up @@ -324,6 +325,27 @@ public void testOfferIfBelowThresholdSemantics() throws Exception
t1.join();
t2.join();
assertEquals("Unexpected size observed", 0, fail.value);
}

@Test
public void testOfferWithThreshold()
{
MpscBlockingConsumerArrayQueue<Integer> queue = new MpscBlockingConsumerArrayQueue<Integer>(16);
int i;
for (i = 0; i < 8; ++i)
{
//Offers succeed because current size is below the HWM.
Assert.assertTrue(queue.offerIfBelowThreshold(i, 8));
}
//Not anymore, our offer got rejected.
Assert.assertFalse(queue.offerIfBelowThreshold(i, 8));
Assert.assertFalse(queue.offerIfBelowThreshold(i, 7));
Assert.assertFalse(queue.offerIfBelowThreshold(i, 1));
Assert.assertFalse(queue.offerIfBelowThreshold(i, 0));

//Also, the threshold is dynamic and different levels can be set for
//different task priorities.
Assert.assertTrue(queue.offerIfBelowThreshold(i, 9));
Assert.assertTrue(queue.offerIfBelowThreshold(i, 16));
}
}
@@ -0,0 +1,66 @@
package org.jctools.queues;

import org.jctools.queues.spec.ConcurrentQueueSpec;
import org.jctools.queues.spec.Ordering;
import org.junit.Ignore;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;

@RunWith(Parameterized.class)
public class QueueSanityTestMpscBlockingConsumerOfferBelowThreshold extends QueueSanityTest
{
public QueueSanityTestMpscBlockingConsumerOfferBelowThreshold(ConcurrentQueueSpec spec, Queue<Integer> queue)
{
super(spec, queue);
}

@Parameterized.Parameters
public static Collection<Object[]> parameters()
{
ArrayList<Object[]> list = new ArrayList<Object[]>();
MpscBlockingConsumerArrayQueueOverride<Integer> q = new MpscBlockingConsumerArrayQueueOverride<Integer>(16);
list.add(makeQueue(0, 1, 8, Ordering.FIFO, q));
q = new MpscBlockingConsumerArrayQueueOverride<Integer>(16);
q.threshold = 12;
list.add(makeQueue(0, 1, 12, Ordering.FIFO, q));
q = new MpscBlockingConsumerArrayQueueOverride<Integer>(16);
q.threshold = 4;
list.add(makeQueue(0, 1, 4, Ordering.FIFO, q));
return list;
}

@Ignore
public void testPowerOf2Capacity()
{
}

@Ignore
public void testIterator()
{
}

/**
* This allows us to test the offersIfBelowThreshold through all the offer utilizing threads. The effect should be
* as if the queue capacity is halved.
*/
static class MpscBlockingConsumerArrayQueueOverride<E> extends MpscBlockingConsumerArrayQueue<E>
{
int threshold;

public MpscBlockingConsumerArrayQueueOverride(int capacity)
{
super(capacity);
threshold = capacity() / 2;
}

@Override
public boolean offer(E e)
{
return super.offerIfBelowThreshold(e, threshold);
}
}
}

0 comments on commit 8f268d5

Please sign in to comment.