Skip to content

Commit

Permalink
[fix] [broker] Fix metrics pulsar_topic_load_failed_count is 0 when l…
Browse files Browse the repository at this point in the history
…oad non-persistent topic fails and fix the flaky test testBrokerStatsTopicLoadFailed (#22580)

(cherry picked from commit 340d60d)
  • Loading branch information
poorbarcode committed May 2, 2024
1 parent f584be1 commit 704630b
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1192,8 +1192,8 @@ private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic
if (log.isDebugEnabled()) {
log.debug("Broker is unable to load non-persistent topic {}", topic);
}
return FutureUtil.failedFuture(
new NotAllowedException("Broker is not unable to load non-persistent topic"));
topicFuture.completeExceptionally(new NotAllowedException("Broker is not unable to load non-persistent topic"));
return topicFuture;
}
final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.StringReader;
import java.lang.reflect.Array;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
Expand All @@ -67,6 +68,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
Expand Down Expand Up @@ -102,6 +104,10 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MockZooKeeper;
import org.glassfish.jersey.client.JerseyClient;
import org.glassfish.jersey.client.JerseyClientBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -1461,4 +1467,117 @@ public void testDuplicateAcknowledgement() throws Exception {
assertEquals(admin.topics().getStats(topicName).getSubscriptions()
.get("sub-1").getUnackedMessages(), 0);
}

@Test
public void testMetricsPersistentTopicLoadFails() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
String topic = "persistent://" + namespace + "/topic1_" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);
admin.topics().unload(topic);

// Inject an error that makes the topic load fails.
AtomicBoolean failMarker = new AtomicBoolean(true);
mockZooKeeper.failConditional(KeeperException.Code.NODEEXISTS, (op, path) -> {
if (failMarker.get() && op.equals(MockZooKeeper.Op.SET) &&
path.endsWith(TopicName.get(topic).getPersistenceNamingEncoding())) {
return true;
}
return false;
});

// Do test
Thread.sleep(1000 * 3);
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
JerseyClient httpClient = JerseyClientBuilder.createClient();
Awaitility.await().until(() -> {
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
BufferedReader reader = new BufferedReader(new StringReader(response));
String line;
String metricsLine = null;
while ((line = reader.readLine()) != null) {
if (StringUtils.isBlank(line)) {
continue;
}
if (line.startsWith("#")) {
continue;
}
if (line.contains("topic_load_failed")) {
metricsLine = line;
break;
}
}
log.info("topic_load_failed: {}", metricsLine);
if (metricsLine == null) {
return false;
}
reader.close();
String[] parts = metricsLine.split(" ");
Double value = Double.valueOf(parts[parts.length - 1]);
return value >= 1D;
});

// Remove the injection.
failMarker.set(false);
// cleanup.
httpClient.close();
producer.join().close();
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
}

@Test
public void testMetricsNonPersistentTopicLoadFails() throws Exception {
final String namespace = "prop/" + UUID.randomUUID().toString().replaceAll("-", "");
String topic = "non-persistent://" + namespace + "/topic1_" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);

// Inject an error that makes the topic load fails.
pulsar.getConfiguration().setEnableNonPersistentTopics(false);

// Do test.
CompletableFuture<Producer<byte[]>> producer = pulsarClient.newProducer().topic(topic).createAsync();
JerseyClient httpClient = JerseyClientBuilder.createClient();
Awaitility.await().until(() -> {
String response = httpClient.target(pulsar.getWebServiceAddress()).path("/metrics/")
.request().get(String.class);
BufferedReader reader = new BufferedReader(new StringReader(response));
String line;
String metricsLine = null;
while ((line = reader.readLine()) != null) {
if (StringUtils.isBlank(line)) {
continue;
}
if (line.startsWith("#")) {
continue;
}
if (line.contains("topic_load_failed")) {
metricsLine = line;
break;
}
}
log.info("topic_load_failed: {}", metricsLine);
if (metricsLine == null) {
return false;
}
reader.close();
String[] parts = metricsLine.split(" ");
Double value = Double.valueOf(parts[parts.length - 1]);
return value >= 1D;
});

// Remove the injection.
pulsar.getConfiguration().setEnableNonPersistentTopics(true);

// cleanup.
httpClient.close();
try {
producer.join().close();
} catch (Exception ex) {
// The producer creation failed, so skip to close it.
}
admin.topics().delete(topic);
admin.namespaces().deleteNamespace(namespace);
}
}

0 comments on commit 704630b

Please sign in to comment.