diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java index cd6149e2d65e..eac290032191 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java @@ -15,20 +15,20 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.ListIterator; import java.util.Objects; -import java.util.Random; +import java.util.Set; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import static org.eclipse.jetty.util.BlockingArrayQueueTest.Await.await; import static org.hamcrest.MatcherAssert.assertThat; @@ -212,21 +212,20 @@ public void run() } @Test - @DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review public void testConcurrentAccess() throws Exception { - final int THREADS = 50; + final int THREADS = 32; final int LOOPS = 1000; - final BlockingArrayQueue queue = new BlockingArrayQueue<>(1 + THREADS * LOOPS); + BlockingArrayQueue queue = new BlockingArrayQueue<>(1 + THREADS * LOOPS); - final ConcurrentLinkedQueue produced = new ConcurrentLinkedQueue<>(); - final ConcurrentLinkedQueue consumed = new ConcurrentLinkedQueue<>(); + Set produced = ConcurrentHashMap.newKeySet(); + Set consumed = ConcurrentHashMap.newKeySet(); - final AtomicBoolean running = new AtomicBoolean(true); + AtomicBoolean consumersRunning = new AtomicBoolean(true); // start consumers - final CyclicBarrier barrier0 = new CyclicBarrier(THREADS + 1); + CyclicBarrier consumersBarrier = new CyclicBarrier(THREADS + 1); for (int i = 0; i < THREADS; i++) { new Thread() @@ -234,20 +233,18 @@ public void testConcurrentAccess() throws Exception @Override public void run() { - final Random random = new Random(); - setPriority(getPriority() - 1); try { - while (running.get()) + while (consumersRunning.get()) { - int r = 1 + random.nextInt(10); + int r = 1 + ThreadLocalRandom.current().nextInt(10); if (r % 2 == 0) { Integer msg = queue.poll(); if (msg == null) { - Thread.sleep(1 + random.nextInt(10)); + Thread.sleep(ThreadLocalRandom.current().nextInt(2)); continue; } consumed.add(msg); @@ -268,7 +265,7 @@ public void run() { try { - barrier0.await(); + consumersBarrier.await(); } catch (Exception e) { @@ -280,7 +277,7 @@ public void run() } // start producers - final CyclicBarrier barrier1 = new CyclicBarrier(THREADS + 1); + CyclicBarrier producersBarrier = new CyclicBarrier(THREADS + 1); for (int i = 0; i < THREADS; i++) { final int id = i; @@ -289,16 +286,15 @@ public void run() @Override public void run() { - final Random random = new Random(); try { for (int j = 0; j < LOOPS; j++) { - Integer msg = random.nextInt(); + Integer msg = ThreadLocalRandom.current().nextInt(); produced.add(msg); if (!queue.offer(msg)) throw new Exception(id + " FULL! " + queue.size()); - Thread.sleep(1 + random.nextInt(10)); + Thread.sleep(ThreadLocalRandom.current().nextInt(2)); } } catch (Exception e) @@ -309,7 +305,7 @@ public void run() { try { - barrier1.await(); + producersBarrier.await(); } catch (Exception e) { @@ -320,22 +316,22 @@ public void run() }.start(); } - barrier1.await(); - int size = queue.size(); - int last = size - 1; - while (size > 0 && size != last) + producersBarrier.await(); + + AtomicInteger last = new AtomicInteger(queue.size() - 1); + await().atMost(5, TimeUnit.SECONDS).until(() -> { - last = size; - Thread.sleep(500); - size = queue.size(); - } - running.set(false); - barrier0.await(); + int size = queue.size(); + if (size == 0 && last.get() == size) + return true; + last.set(size); + return false; + }); - HashSet prodSet = new HashSet<>(produced); - HashSet consSet = new HashSet<>(consumed); + consumersRunning.set(false); + consumersBarrier.await(); - assertEquals(prodSet, consSet); + assertEquals(produced, consumed); } @Test