Skip to content

Commit

Permalink
Implement fill methods for compound queue, expand testing, fix #197
Browse files Browse the repository at this point in the history
  • Loading branch information
nitsanw committed Sep 25, 2017
1 parent f57d1e4 commit 7e58c9f
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 24 deletions.
Expand Up @@ -13,6 +13,7 @@
*/
package org.jctools.queues;

import org.jctools.util.PortableJvmInfo;
import org.jctools.util.RangeUtil;

import java.util.AbstractQueue;
Expand Down Expand Up @@ -274,7 +275,19 @@ public int drain(Consumer<E> c)
@Override
public int fill(Supplier<E> s)
{
throw new UnsupportedOperationException();
long result = 0;// result is a long because we want to have a safepoint check at regular intervals
final int capacity = capacity();
do
{
final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH);
if (filled == 0)
{
return (int) result;
}
result += filled;
}
while (result <= capacity);
return (int) result;
}

@Override
Expand All @@ -295,7 +308,28 @@ public int drain(Consumer<E> c, int limit)
@Override
public int fill(Supplier<E> s, int limit)
{
throw new UnsupportedOperationException();
final int parallelQueuesMask = this.parallelQueuesMask;
int start = (int) (Thread.currentThread().getId() & parallelQueuesMask);
final MpscArrayQueue<E>[] queues = this.queues;
int filled = queues[start].fill(s, limit);
if (filled == limit)
{
return limit;
}
else
{
// we already offered to first queue, try the rest
for (int i = start + 1; i < start + parallelQueuesMask + 1; i++)
{
filled += queues[i & parallelQueuesMask].fill(s, limit - filled);
if (filled == limit)
{
return limit;
}
}
// this is a relaxed offer, we can fail for any reason we like
return filled;
}
}

@Override
Expand All @@ -319,18 +353,14 @@ public void drain(
}

@Override
public void fill(
Supplier<E> s,
WaitStrategy wait,
ExitCondition exit)
public void fill(Supplier<E> s, WaitStrategy w, ExitCondition exit)
{
int idleCounter = 0;
while (exit.keepRunning())
{
E e = s.get();
while (!relaxedOffer(e))
if (fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH) == 0)
{
idleCounter = wait.idle(idleCounter);
idleCounter = w.idle(idleCounter);
continue;
}
idleCounter = 0;
Expand Down
34 changes: 20 additions & 14 deletions jctools-core/src/test/java/org/jctools/queues/MpqSanityTest.java
Expand Up @@ -106,54 +106,61 @@ public void sanity()
}
}

int sum;
@Test
public void sanityDrainBatch()
{
assumeThat(spec.ordering, not(Ordering.NONE));

assertEquals(0, queue.drain(e ->
{
}, SIZE));
assertTrue(queue.isEmpty());
assertTrue(queue.size() == 0);
count = 0;
sum = 0;
int i = queue.fill(() ->
{
return count++;
final int val = count++;
sum += val;
return val;
}, SIZE);
final int size = i;
assertEquals(size, queue.size());
if (spec.ordering == Ordering.FIFO)
{
// expect FIFO
p = queue.relaxedPeek();
count = 0;
int drainCount = 0;
i = 0;
do
{
i += drainCount = queue.drain(e ->
{
// batch consumption can cause size to differ from following expectation
// this is because elements are 'claimed' in a batch and their consumption lags
if (spec.consumers == 1)
{
assertEquals(p, e); // peek will return the post claim peek
assertEquals(size - (count + 1), queue.size()); // size will return the post claim size
}
assertEquals(count++, e.intValue());
p = queue.relaxedPeek();
});
}
while (drainCount != 0);
p = null;
assertEquals(size, i);

assertTrue(queue.isEmpty());
assertTrue(queue.size() == 0);
}
else
{
int drainCount = 0;
i = 0;
do
{
i += drainCount = queue.drain(e ->
{
sum -= e.intValue();
});
}
while (drainCount != 0);
assertEquals(size, i);

assertTrue(queue.isEmpty());
assertTrue(queue.size() == 0);
assertEquals(0, sum);
}
}

Expand Down Expand Up @@ -181,7 +188,6 @@ public void testSizeIsTheNumberOfOffers()
public void supplyMessageUntilFull()
{
assumeThat(spec.isBounded(), is(Boolean.TRUE));
assumeThat(spec.ordering, not(Ordering.NONE));
final Val instances = new Val();
instances.value = 0;
final MessagePassingQueue.Supplier<Integer> messageFactory = () -> instances.value++;
Expand Down
Expand Up @@ -12,7 +12,6 @@
import static org.jctools.util.PortableJvmInfo.CPUs;

@RunWith(Parameterized.class)
@Ignore
public class MpqSanityTestMpscCompound extends MpqSanityTest
{
public MpqSanityTestMpscCompound(ConcurrentQueueSpec spec, MessagePassingQueue<Integer> queue)
Expand Down

0 comments on commit 7e58c9f

Please sign in to comment.