diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java index d87c2574f2845..575bd19063ed6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/KeySharedSubscriptionTest.java @@ -46,6 +46,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.util.Murmur3_32Hash; import org.awaitility.Awaitility; @@ -669,6 +670,7 @@ public void testRemoveFirstConsumer() throws Exception { c1.close(); + ((ConsumerImpl) c2).clearIncomingMessagesAndGetMessageNumber(); // Now C2 will get all messages for (int i = 0; i < 20; i++) { Message msg = c2.receive(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java index 68baf34653582..6c81319f6610c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TopicReaderTest.java @@ -1376,24 +1376,23 @@ public void testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exceptio final int halfMessages = numOfMessage / 2; admin.topics().createPartitionedTopic(topicName, 3); Producer producer = pulsarClient.newProducer().topic(topicName).create(); - - long l = System.currentTimeMillis(); + long halfTime = 0; for (int i = 0; i < numOfMessage; i++) { + if (i == numOfMessage / 2) { + halfTime = System.currentTimeMillis(); + } producer.send(String.format("msg num %d", i).getBytes()); } - + Assert.assertTrue(halfTime != 0); Reader reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create(); - int plusTime = (halfMessages + 1) * 100; - reader.seek(l + plusTime); - + reader.seek(halfTime); Set messageSet = Sets.newHashSet(); for (int i = halfMessages + 1; i < numOfMessage; i++) { - Message message = reader.readNext(); + Message message = reader.readNext(10, TimeUnit.SECONDS); String receivedMessage = new String(message.getData()); Assert.assertTrue(messageSet.add(receivedMessage), "Received duplicate message " + receivedMessage); } - reader.close(); producer.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index 2baec56e869bc..6a1d5f260a54c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -74,6 +75,7 @@ import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -90,7 +92,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(V1_ProducerConsumerTest.class); private static final long BATCHING_MAX_PUBLISH_DELAY_THRESHOLD = 1; - @BeforeMethod + @BeforeMethod(alwaysRun = true) @Override protected void setup() throws Exception { super.internalSetup(); @@ -542,7 +544,8 @@ public Void call() throws Exception { ConsumerImpl consumerImpl = (ConsumerImpl) consumer; // The available permits should be 10 and num messages in the queue should be 90 - Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads)); Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads); barrier.reset(); @@ -560,7 +563,8 @@ public Void call() throws Exception { Thread.sleep(100); // The available permits should be 20 and num messages in the queue should be 80 - Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads * 2); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads * 2)); Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - (numConsumersThreads * 2)); // clear the queue @@ -594,7 +598,8 @@ public Void call() throws Exception { Thread.sleep(2000); // The available permits should be 10 and num messages in the queue should be 90 - Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads); + Awaitility.await().untilAsserted(() -> + Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads)); Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads); consumer.close(); @@ -760,6 +765,7 @@ public void testAsyncProducerAndConsumer() throws Exception { log.info(" start receiving messages :"); CountDownLatch latch = new CountDownLatch(totalMsg); // receive messages + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(1); receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor); @@ -773,7 +779,6 @@ public void testAsyncProducerAndConsumer() throws Exception { producer.close(); consumer.close(); - executor.shutdownNow(); log.info("-- Exiting {} test --", methodName); } @@ -804,6 +809,7 @@ public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception { log.info(" start receiving messages :"); CountDownLatch latch = new CountDownLatch(totalMsg); // receive messages + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(1); receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor); @@ -817,7 +823,6 @@ public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception { producer.close(); consumer.close(); - executor.shutdownNow(); log.info("-- Exiting {} test --", methodName); } @@ -1131,6 +1136,7 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep .subscriptionType(SubscriptionType.Shared) .subscribe(); + @Cleanup PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer consumer2 = newPulsarClient.newConsumer() .topic("persistent://my-property/use/my-ns/unacked-topic") @@ -1207,7 +1213,6 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep producer.close(); consumer1.close(); consumer2.close(); - newPulsarClient.close(); log.info("-- Exiting {} test --", methodName); } catch (Exception e) { fail(); @@ -1753,21 +1758,25 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile public void testPriorityConsumer() throws Exception { log.info("-- Starting {} test --", methodName); + @Cleanup PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer consumer1 = newPulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2") .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared) .priorityLevel(1).receiverQueueSize(5).subscribe(); + @Cleanup PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer consumer2 = newPulsarClient1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2") .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared) .priorityLevel(1).receiverQueueSize(5).subscribe(); + @Cleanup PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer consumer3 = newPulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2") .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared) .priorityLevel(1).receiverQueueSize(5).subscribe(); + @Cleanup PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer consumer4 = newPulsarClient3.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2") .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared) @@ -1814,10 +1823,6 @@ public void testPriorityConsumer() throws Exception { consumer2.close(); consumer3.close(); consumer4.close(); - newPulsarClient.close(); - newPulsarClient1.close(); - newPulsarClient2.close(); - newPulsarClient3.close(); log.info("-- Exiting {} test --", methodName); } @@ -1845,6 +1850,7 @@ public void testSharedSamePriorityConsumer() throws Exception { .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared) .receiverQueueSize(queueSize).subscribe(); + @Cleanup PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer c2 = newPulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2") .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared) @@ -1892,16 +1898,19 @@ public void testSharedSamePriorityConsumer() throws Exception { Assert.assertEquals(queueSize * 2, messages.size()); // create new consumers with the same priority + @Cleanup PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer c3 = newPulsarClient1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2") .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared) .receiverQueueSize(queueSize).subscribe(); + @Cleanup PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer c4 = newPulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2") .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared) .receiverQueueSize(queueSize).subscribe(); + @Cleanup PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection Consumer c5 = newPulsarClient3.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2") .subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared) @@ -1947,10 +1956,6 @@ public void testSharedSamePriorityConsumer() throws Exception { c3.close(); c4.close(); c5.close(); - newPulsarClient.close(); - newPulsarClient1.close(); - newPulsarClient2.close(); - newPulsarClient3.close(); pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs); log.info("-- Exiting {} test --", methodName); } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index c7aad50f2a9ed..0f32c41f0a20c 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -50,13 +50,14 @@ public class ConsumerImplTest { - private ExecutorProvider executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ConsumerImplTest")); + private ExecutorProvider executorProvider; private ConsumerImpl consumer; private ConsumerConfigurationData consumerConf; private ExecutorService executorService; @BeforeMethod public void setUp() { + executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ConsumerImplTest")); consumerConf = new ConsumerConfigurationData<>(); PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock(); executorService = Executors.newSingleThreadExecutor(); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java index 228927fde819e..6dbb663aa0787 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyServiceStarterTest.java @@ -18,9 +18,9 @@ */ package org.apache.pulsar.proxy.server; +import io.prometheus.client.CollectorRegistry; import lombok.Cleanup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; - import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.common.util.ObjectMapperFactory; @@ -35,14 +35,12 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; - import java.net.URI; import java.nio.ByteBuffer; import java.util.Base64; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Future; - import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -61,6 +59,7 @@ protected void setup() throws Exception { serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress()); serviceStarter.getConfig().setServicePort(Optional.of(11000)); serviceStarter.getConfig().setWebSocketServiceEnabled(true); + CollectorRegistry.defaultRegistry.clear(); serviceStarter.start(); }