Skip to content

Commit

Permalink
[Branch-2.7] Fix branch-2.7 test. (#11254)
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 committed Jul 8, 2021
1 parent 6ba062c commit d5a844f
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 27 deletions.
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.util.Murmur3_32Hash;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -669,6 +670,7 @@ public void testRemoveFirstConsumer() throws Exception {

c1.close();

((ConsumerImpl<Integer>) c2).clearIncomingMessagesAndGetMessageNumber();
// Now C2 will get all messages
for (int i = 0; i < 20; i++) {
Message<Integer> msg = c2.receive();
Expand Down
Expand Up @@ -1376,24 +1376,23 @@ public void testMultiReaderIsAbleToSeekWithTimeOnMiddleOfTopic() throws Exceptio
final int halfMessages = numOfMessage / 2;
admin.topics().createPartitionedTopic(topicName, 3);
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

long l = System.currentTimeMillis();
long halfTime = 0;
for (int i = 0; i < numOfMessage; i++) {
if (i == numOfMessage / 2) {
halfTime = System.currentTimeMillis();
}
producer.send(String.format("msg num %d", i).getBytes());
}

Assert.assertTrue(halfTime != 0);
Reader<byte[]> reader = pulsarClient.newReader().topic(topicName).startMessageId(MessageId.earliest).create();

int plusTime = (halfMessages + 1) * 100;
reader.seek(l + plusTime);

reader.seek(halfTime);
Set<String> messageSet = Sets.newHashSet();
for (int i = halfMessages + 1; i < numOfMessage; i++) {
Message<byte[]> message = reader.readNext();
Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
String receivedMessage = new String(message.getData());
Assert.assertTrue(messageSet.add(receivedMessage), "Received duplicate message " + receivedMessage);
}

reader.close();
producer.close();
}
Expand Down
Expand Up @@ -51,6 +51,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -74,6 +75,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand All @@ -90,7 +92,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(V1_ProducerConsumerTest.class);
private static final long BATCHING_MAX_PUBLISH_DELAY_THRESHOLD = 1;

@BeforeMethod
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
super.internalSetup();
Expand Down Expand Up @@ -542,7 +544,8 @@ public Void call() throws Exception {

ConsumerImpl<byte[]> consumerImpl = (ConsumerImpl<byte[]>) consumer;
// The available permits should be 10 and num messages in the queue should be 90
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads));
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads);

barrier.reset();
Expand All @@ -560,7 +563,8 @@ public Void call() throws Exception {
Thread.sleep(100);

// The available permits should be 20 and num messages in the queue should be 80
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads * 2);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads * 2));
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - (numConsumersThreads * 2));

// clear the queue
Expand Down Expand Up @@ -594,7 +598,8 @@ public Void call() throws Exception {
Thread.sleep(2000);

// The available permits should be 10 and num messages in the queue should be 90
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads);
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads));
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads);
consumer.close();

Expand Down Expand Up @@ -760,6 +765,7 @@ public void testAsyncProducerAndConsumer() throws Exception {
log.info(" start receiving messages :");
CountDownLatch latch = new CountDownLatch(totalMsg);
// receive messages
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(1);
receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor);

Expand All @@ -773,7 +779,6 @@ public void testAsyncProducerAndConsumer() throws Exception {

producer.close();
consumer.close();
executor.shutdownNow();
log.info("-- Exiting {} test --", methodName);
}

Expand Down Expand Up @@ -804,6 +809,7 @@ public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception {
log.info(" start receiving messages :");
CountDownLatch latch = new CountDownLatch(totalMsg);
// receive messages
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(1);
receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor);

Expand All @@ -817,7 +823,6 @@ public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception {

producer.close();
consumer.close();
executor.shutdownNow();
log.info("-- Exiting {} test --", methodName);
}

Expand Down Expand Up @@ -1131,6 +1136,7 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
.subscriptionType(SubscriptionType.Shared)
.subscribe();

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient.newConsumer()
.topic("persistent://my-property/use/my-ns/unacked-topic")
Expand Down Expand Up @@ -1207,7 +1213,6 @@ public void testMutlipleSharedConsumerBlockingWithUnAckedMessages() throws Excep
producer.close();
consumer1.close();
consumer2.close();
newPulsarClient.close();
log.info("-- Exiting {} test --", methodName);
} catch (Exception e) {
fail();
Expand Down Expand Up @@ -1753,21 +1758,25 @@ public void testBlockUnackedConsumerRedeliverySpecificMessagesCloseConsumerWhile
public void testPriorityConsumer() throws Exception {
log.info("-- Starting {} test --", methodName);

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer1 = newPulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.priorityLevel(1).receiverQueueSize(5).subscribe();

@Cleanup
PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer2 = newPulsarClient1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.priorityLevel(1).receiverQueueSize(5).subscribe();

@Cleanup
PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer3 = newPulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.priorityLevel(1).receiverQueueSize(5).subscribe();

@Cleanup
PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> consumer4 = newPulsarClient3.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
Expand Down Expand Up @@ -1814,10 +1823,6 @@ public void testPriorityConsumer() throws Exception {
consumer2.close();
consumer3.close();
consumer4.close();
newPulsarClient.close();
newPulsarClient1.close();
newPulsarClient2.close();
newPulsarClient3.close();
log.info("-- Exiting {} test --", methodName);
}

Expand Down Expand Up @@ -1845,6 +1850,7 @@ public void testSharedSamePriorityConsumer() throws Exception {
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(queueSize).subscribe();

@Cleanup
PulsarClient newPulsarClient = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> c2 = newPulsarClient.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
Expand Down Expand Up @@ -1892,16 +1898,19 @@ public void testSharedSamePriorityConsumer() throws Exception {
Assert.assertEquals(queueSize * 2, messages.size());

// create new consumers with the same priority
@Cleanup
PulsarClient newPulsarClient1 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> c3 = newPulsarClient1.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(queueSize).subscribe();

@Cleanup
PulsarClient newPulsarClient2 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> c4 = newPulsarClient2.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
.receiverQueueSize(queueSize).subscribe();

@Cleanup
PulsarClient newPulsarClient3 = newPulsarClient(lookupUrl.toString(), 0);// Creates new client connection
Consumer<byte[]> c5 = newPulsarClient3.newConsumer().topic("persistent://my-property/use/my-ns/my-topic2")
.subscriptionName("my-subscriber-name").subscriptionType(SubscriptionType.Shared)
Expand Down Expand Up @@ -1947,10 +1956,6 @@ public void testSharedSamePriorityConsumer() throws Exception {
c3.close();
c4.close();
c5.close();
newPulsarClient.close();
newPulsarClient1.close();
newPulsarClient2.close();
newPulsarClient3.close();
pulsar.getConfiguration().setMaxUnackedMessagesPerConsumer(maxUnAckMsgs);
log.info("-- Exiting {} test --", methodName);
}
Expand Down
Expand Up @@ -50,13 +50,14 @@
public class ConsumerImplTest {


private ExecutorProvider executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ConsumerImplTest"));
private ExecutorProvider executorProvider;
private ConsumerImpl<byte[]> consumer;
private ConsumerConfigurationData consumerConf;
private ExecutorService executorService;

@BeforeMethod
public void setUp() {
executorProvider = new ExecutorProvider(1, new DefaultThreadFactory("ConsumerImplTest"));
consumerConf = new ConsumerConfigurationData<>();
PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock();
executorService = Executors.newSingleThreadExecutor();
Expand Down
Expand Up @@ -18,9 +18,9 @@
*/
package org.apache.pulsar.proxy.server;

import io.prometheus.client.CollectorRegistry;
import lombok.Cleanup;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;

import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.util.ObjectMapperFactory;
Expand All @@ -35,14 +35,12 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Future;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

Expand All @@ -61,6 +59,7 @@ protected void setup() throws Exception {
serviceStarter.getConfig().setBrokerWebServiceURL(pulsar.getWebServiceAddress());
serviceStarter.getConfig().setServicePort(Optional.of(11000));
serviceStarter.getConfig().setWebSocketServiceEnabled(true);
CollectorRegistry.defaultRegistry.clear();
serviceStarter.start();
}

Expand Down

0 comments on commit d5a844f

Please sign in to comment.