Skip to content

Commit

Permalink
fix flaky in step2 and step3 (#12954)
Browse files Browse the repository at this point in the history
(cherry picked from commit 94736a4)
  • Loading branch information
Jason918 authored and codelipenghui committed Jan 29, 2022
1 parent df232a0 commit 16cf802
Showing 1 changed file with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.List;
Expand All @@ -41,17 +45,15 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;


import lombok.Cleanup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -61,12 +63,6 @@
import org.testng.annotations.Test;
import org.testng.collections.Lists;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;

@Test(groups = "flaky")
public class DispatcherBlockConsumerTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(DispatcherBlockConsumerTest.class);
Expand Down Expand Up @@ -688,6 +684,7 @@ public void testBlockBrokerDispatching() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

try {
final int waitMills = 500;
final int maxUnAckPerBroker = 200;
final double unAckMsgPercentagePerDispatcher = 10;
int maxUnAckPerDispatcher = (int) ((maxUnAckPerBroker * unAckMsgPercentagePerDispatcher) / 100); // 200 *
Expand Down Expand Up @@ -745,7 +742,7 @@ public void testBlockBrokerDispatching() {
Message<byte[]> msg = null;
Set<MessageId> messages1 = Sets.newHashSet();
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer1Sub1.receive(100, TimeUnit.MILLISECONDS);
msg = consumer1Sub1.receive(waitMills, TimeUnit.MILLISECONDS);
if (msg != null) {
messages1.add(msg.getMessageId());
} else {
Expand All @@ -754,7 +751,7 @@ public void testBlockBrokerDispatching() {
// once consumer receives maxUnAckPerBroker-msgs then sleep to give a chance to scheduler to block the
// subscription
if (j == maxUnAckPerBroker) {
Thread.sleep(200);
Thread.sleep(waitMills);
}
}
// client must receive number of messages = maxUnAckPerbroker rather all produced messages
Expand All @@ -767,7 +764,7 @@ public void testBlockBrokerDispatching() {
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int consumer2Msgs = 0;
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer2Sub1.receive(100, TimeUnit.MILLISECONDS);
msg = consumer2Sub1.receive(waitMills, TimeUnit.MILLISECONDS);
if (msg != null) {
consumer2Msgs++;
} else {
Expand All @@ -792,7 +789,7 @@ public void testBlockBrokerDispatching() {
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Set<MessageId> messages2 = Sets.newHashSet();
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumerSub2.receive(100, TimeUnit.MILLISECONDS);
msg = consumerSub2.receive(waitMills, TimeUnit.MILLISECONDS);
if (msg != null) {
messages2.add(msg.getMessageId());
} else {
Expand All @@ -809,7 +806,7 @@ public void testBlockBrokerDispatching() {
.subscriptionType(SubscriptionType.Shared).acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int consumedMsgsSub3 = 0;
for (int j = 0; j < totalProducedMsgs; j++) {
msg = consumer1Sub3.receive(100, TimeUnit.MILLISECONDS);
msg = consumer1Sub3.receive();
if (msg != null) {
consumedMsgsSub3++;
consumer1Sub3.acknowledge(msg);
Expand Down

0 comments on commit 16cf802

Please sign in to comment.