Skip to content

Commit

Permalink
#6541 improve testConcurrentAccess perf
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Jul 29, 2021
1 parent 9e047ab commit d8a890f
Showing 1 changed file with 31 additions and 35 deletions.
Expand Up @@ -15,20 +15,20 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;

import static org.eclipse.jetty.util.BlockingArrayQueueTest.Await.await;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -212,42 +212,39 @@ public void run()
}

@Test
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testConcurrentAccess() throws Exception
{
final int THREADS = 50;
final int THREADS = 32;
final int LOOPS = 1000;

final BlockingArrayQueue<Integer> queue = new BlockingArrayQueue<>(1 + THREADS * LOOPS);
BlockingArrayQueue<Integer> queue = new BlockingArrayQueue<>(1 + THREADS * LOOPS);

final ConcurrentLinkedQueue<Integer> produced = new ConcurrentLinkedQueue<>();
final ConcurrentLinkedQueue<Integer> consumed = new ConcurrentLinkedQueue<>();
Set<Integer> produced = ConcurrentHashMap.newKeySet();
Set<Integer> consumed = ConcurrentHashMap.newKeySet();

final AtomicBoolean running = new AtomicBoolean(true);
AtomicBoolean consumersRunning = new AtomicBoolean(true);

// start consumers
final CyclicBarrier barrier0 = new CyclicBarrier(THREADS + 1);
CyclicBarrier consumersBarrier = new CyclicBarrier(THREADS + 1);
for (int i = 0; i < THREADS; i++)
{
new Thread()
{
@Override
public void run()
{
final Random random = new Random();

setPriority(getPriority() - 1);
try
{
while (running.get())
while (consumersRunning.get())
{
int r = 1 + random.nextInt(10);
int r = 1 + ThreadLocalRandom.current().nextInt(10);
if (r % 2 == 0)
{
Integer msg = queue.poll();
if (msg == null)
{
Thread.sleep(1 + random.nextInt(10));
Thread.sleep(ThreadLocalRandom.current().nextInt(2));
continue;
}
consumed.add(msg);
Expand All @@ -268,7 +265,7 @@ public void run()
{
try
{
barrier0.await();
consumersBarrier.await();
}
catch (Exception e)
{
Expand All @@ -280,7 +277,7 @@ public void run()
}

// start producers
final CyclicBarrier barrier1 = new CyclicBarrier(THREADS + 1);
CyclicBarrier producersBarrier = new CyclicBarrier(THREADS + 1);
for (int i = 0; i < THREADS; i++)
{
final int id = i;
Expand All @@ -289,16 +286,15 @@ public void run()
@Override
public void run()
{
final Random random = new Random();
try
{
for (int j = 0; j < LOOPS; j++)
{
Integer msg = random.nextInt();
Integer msg = ThreadLocalRandom.current().nextInt();
produced.add(msg);
if (!queue.offer(msg))
throw new Exception(id + " FULL! " + queue.size());
Thread.sleep(1 + random.nextInt(10));
Thread.sleep(ThreadLocalRandom.current().nextInt(2));
}
}
catch (Exception e)
Expand All @@ -309,7 +305,7 @@ public void run()
{
try
{
barrier1.await();
producersBarrier.await();
}
catch (Exception e)
{
Expand All @@ -320,22 +316,22 @@ public void run()
}.start();
}

barrier1.await();
int size = queue.size();
int last = size - 1;
while (size > 0 && size != last)
producersBarrier.await();

AtomicInteger last = new AtomicInteger(queue.size() - 1);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
last = size;
Thread.sleep(500);
size = queue.size();
}
running.set(false);
barrier0.await();
int size = queue.size();
if (size == 0 && last.get() == size)
return true;
last.set(size);
return false;
});

HashSet<Integer> prodSet = new HashSet<>(produced);
HashSet<Integer> consSet = new HashSet<>(consumed);
consumersRunning.set(false);
consumersBarrier.await();

assertEquals(prodSet, consSet);
assertEquals(produced, consumed);
}

@Test
Expand Down

0 comments on commit d8a890f

Please sign in to comment.