From 342d63ea92e6c506ce3af9b7578f08962c45756c Mon Sep 17 00:00:00 2001 From: Masahiro Sakamoto Date: Mon, 22 Apr 2024 22:41:43 +0900 Subject: [PATCH] Exclude producers for geo-replication from publishers field of topic stats --- .../nonpersistent/NonPersistentTopic.java | 20 +++++---- .../service/persistent/PersistentTopic.java | 25 ++++++----- .../broker/service/OneWayReplicatorTest.java | 41 +++++++++++++++++-- 3 files changed, 66 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 586fcd76151e44..962d93fb48f43c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -745,8 +746,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats replicators.forEach((region, replicator) -> replicator.updateRates()); - nsStats.producerCount += producers.size(); - bundleStats.producerCount += producers.size(); + final MutableInt producerCount = new MutableInt(); topicStatsStream.startObject(topic); topicStatsStream.startList("publishers"); @@ -759,14 +759,19 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats if (producer.isRemote()) { topicStats.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats); - } - - if (hydratePublishers) { - StreamingStats.writePublisherStats(topicStatsStream, publisherStats); + } else { + // Exclude producers for replication from "publishers" and "producerCount" + producerCount.increment(); + if (hydratePublishers) { + StreamingStats.writePublisherStats(topicStatsStream, publisherStats); + } } }); topicStatsStream.endList(); + nsStats.producerCount += producerCount.intValue(); + bundleStats.producerCount += producerCount.intValue(); + // Start replicator stats topicStatsStream.startObject("replication"); nsStats.replicatorCount += topicStats.remotePublishersStats.size(); @@ -855,7 +860,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats // Remaining dest stats. topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0 : (topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn); - topicStatsStream.writePair("producerCount", producers.size()); + topicStatsStream.writePair("producerCount", producerCount.intValue()); topicStatsStream.writePair("averageMsgSize", topicStats.averageMsgSize); topicStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn); topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut); @@ -929,6 +934,7 @@ public CompletableFuture asyncGetStats(GetStatsOptions if (producer.isRemote()) { remotePublishersStats.put(producer.getRemoteCluster(), publisherStats); } else if (!getStatsOptions.isExcludePublishers()) { + // Exclude producers for replication from "publishers" stats.addPublisher(publisherStats); } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 9d6855962ced66..15cce44a9cb332 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -85,6 +85,7 @@ import org.apache.bookkeeper.net.BookieId; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; @@ -2107,8 +2108,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats replicators.forEach((region, replicator) -> replicator.updateRates()); - nsStats.producerCount += producers.size(); - bundleStats.producerCount += producers.size(); + final MutableInt producerCount = new MutableInt(); topicStatsStream.startObject(topic); // start publisher stats @@ -2122,14 +2122,19 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats if (producer.isRemote()) { topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats); - } - - // Populate consumer specific stats here - if (hydratePublishers) { - StreamingStats.writePublisherStats(topicStatsStream, publisherStats); + } else { + // Exclude producers for replication from "publishers" and "producerCount" + producerCount.increment(); + if (hydratePublishers) { + StreamingStats.writePublisherStats(topicStatsStream, publisherStats); + } } }); topicStatsStream.endList(); + + nsStats.producerCount += producerCount.intValue(); + bundleStats.producerCount += producerCount.intValue(); + // if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep // average rate. lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > lastUpdatedAvgPublishRateInMsg @@ -2297,7 +2302,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats // Remaining dest stats. topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0 ? 0.0 : (topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn); - topicStatsStream.writePair("producerCount", producers.size()); + topicStatsStream.writePair("producerCount", producerCount.intValue()); topicStatsStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize); topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn); topicStatsStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut); @@ -2385,8 +2390,8 @@ public CompletableFuture asyncGetStats(GetStatsOptions if (producer.isRemote()) { remotePublishersStats.put(producer.getRemoteCluster(), publisherStats); - } - if (!getStatsOptions.isExcludePublishers()){ + } else if (!getStatsOptions.isExcludePublishers()) { + // Exclude producers for replication from "publishers" stats.addPublisher(publisherStats); } }); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java index 35073575f34ed1..18477f7c2775c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java @@ -26,11 +26,14 @@ import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.util.concurrent.FastThreadLocalThread; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.time.Duration; import java.util.Arrays; +import java.util.Iterator; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -133,17 +136,49 @@ public void testReplicatorProducerStatInTopic() throws Exception { // Verify replicator works. Producer producer1 = client1.newProducer().topic(topicName).create(); + Producer producer2 = client2.newProducer().topic(topicName).create(); // Do not publish messages Consumer consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe(); producer1.newMessage().value(msgValue).send(); pulsar1.getBrokerService().checkReplicationPolicies(); assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue); - // Verify there has one item in the attribute "publishers" or "replications" + // Verify that the "publishers" field does not include the producer for replication TopicStats topicStats2 = admin2.topics().getStats(topicName); - assertTrue(topicStats2.getPublishers().size() + topicStats2.getReplication().size() > 0); + assertEquals(topicStats2.getPublishers().size(), 1); + assertFalse(topicStats2.getPublishers().get(0).getProducerName().startsWith(config1.getReplicatorPrefix())); + + // Update broker stats immediately (usually updated every minute) + pulsar2.getBrokerService().updateRates(); + String brokerStats2 = admin2.brokerStats().getTopics(); + + boolean found = false; + ObjectMapper mapper = new ObjectMapper(); + JsonNode rootNode = mapper.readTree(brokerStats2); + if (rootNode.hasNonNull(replicatedNamespace)) { + Iterator bundleNodes = rootNode.get(replicatedNamespace).elements(); + while (bundleNodes.hasNext()) { + JsonNode bundleNode = bundleNodes.next(); + if (bundleNode.hasNonNull("persistent") && bundleNode.get("persistent").hasNonNull(topicName)) { + found = true; + JsonNode topicNode = bundleNode.get("persistent").get(topicName); + // Verify that the "publishers" field does not include the producer for replication + assertEquals(topicNode.get("publishers").size(), 1); + assertEquals(topicNode.get("producerCount").intValue(), 1); + Iterator publisherNodes = topicNode.get("publishers").elements(); + while (publisherNodes.hasNext()) { + JsonNode publisherNode = publisherNodes.next(); + assertFalse(publisherNode.get("producerName").textValue() + .startsWith(config1.getReplicatorPrefix())); + } + break; + } + } + } + assertTrue(found); // cleanup. - consumer2.close(); + consumer2.unsubscribe(); + producer2.close(); producer1.close(); cleanupTopics(() -> { admin1.topics().delete(topicName);