Skip to content

Commit

Permalink
Exclude producers for geo-replication from publishers field of topic …
Browse files Browse the repository at this point in the history
…stats
  • Loading branch information
massakam committed Apr 22, 2024
1 parent 3a0f908 commit 06d34ea
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -745,8 +746,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

replicators.forEach((region, replicator) -> replicator.updateRates());

nsStats.producerCount += producers.size();
bundleStats.producerCount += producers.size();
final AtomicInteger producerCount = new AtomicInteger();
topicStatsStream.startObject(topic);

topicStatsStream.startList("publishers");
Expand All @@ -759,14 +759,19 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

if (producer.isRemote()) {
topicStats.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}

if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
} else {
// Exclude producers for replication from "publishers" and "producerCount"
producerCount.incrementAndGet();
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
}
});
topicStatsStream.endList();

nsStats.producerCount += producerCount.get();
bundleStats.producerCount += producerCount.get();

// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStats.remotePublishersStats.size();
Expand Down Expand Up @@ -855,7 +860,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
// Remaining dest stats.
topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0
: (topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn);
topicStatsStream.writePair("producerCount", producers.size());
topicStatsStream.writePair("producerCount", producerCount.get());
topicStatsStream.writePair("averageMsgSize", topicStats.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
Expand Down Expand Up @@ -929,6 +934,7 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else if (!getStatsOptions.isExcludePublishers()) {
// Exclude producers for replication from "publishers"
stats.addPublisher(publisherStats);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -2107,8 +2108,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

replicators.forEach((region, replicator) -> replicator.updateRates());

nsStats.producerCount += producers.size();
bundleStats.producerCount += producers.size();
final AtomicInteger producerCount = new AtomicInteger();
topicStatsStream.startObject(topic);

// start publisher stats
Expand All @@ -2122,14 +2122,19 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats

if (producer.isRemote()) {
topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}

// Populate consumer specific stats here
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
} else {
// Exclude producers for replication from "publishers" and "producerCount"
producerCount.incrementAndGet();
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
}
});
topicStatsStream.endList();

nsStats.producerCount += producerCount.get();
bundleStats.producerCount += producerCount.get();

// if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep
// average rate.
lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > lastUpdatedAvgPublishRateInMsg
Expand Down Expand Up @@ -2297,7 +2302,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
// Remaining dest stats.
topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0 ? 0.0
: (topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("producerCount", producers.size());
topicStatsStream.writePair("producerCount", producerCount.get());
topicStatsStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);
Expand Down Expand Up @@ -2385,8 +2390,8 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions

if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}
if (!getStatsOptions.isExcludePublishers()){
} else if (!getStatsOptions.isExcludePublishers()) {
// Exclude producers for replication from "publishers"
stats.addPublisher(publisherStats);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -88,20 +92,49 @@ public void testReplicatorProducerStatInTopic() throws Exception {
Producer<byte[]> producer1 = client1.newProducer().topic(topicName).create();
Consumer<byte[]> consumer2 = client2.newConsumer().topic(topicName).subscriptionName(subscribeName).subscribe();
producer1.newMessage().value(msgValue).send();
pulsar1.getBrokerService().checkReplicationPolicies();
pulsar1.getBrokerService().getTopicIfExists(topicName).get().get().checkReplication().get();
assertEquals(consumer2.receive(10, TimeUnit.SECONDS).getValue(), msgValue);
pulsar2.getBrokerService().getTopicIfExists(topicName).get().get().checkReplication().get();

// Verify there has one item in the attribute "publishers" or "replications"
TopicStats topicStats2 = admin2.topics().getStats(topicName);
assertTrue(topicStats2.getPublishers().size() + topicStats2.getReplication().size() > 0);
// Verify that the "publishers" field does not include the producer for replication
assertEquals(topicStats2.getPublishers().size(), 0);
// Verify that the "replication" field includes replication from cluster1 to cluster2
assertTrue(topicStats2.getReplication().containsKey(cluster1));
assertNotNull(topicStats2.getReplication().get(cluster1).getInboundConnection());
assertNotNull(topicStats2.getReplication().get(cluster1).getInboundConnectedSince());

// Update broker stats immediately (usually updated every minute)
pulsar2.getBrokerService().updateRates();
String brokerStats2 = admin2.brokerStats().getTopics();

boolean found = false;
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(brokerStats2);
if (rootNode.hasNonNull(defaultNamespace)) {
Iterator<JsonNode> bundleNodes = rootNode.get(defaultNamespace).elements();
while (bundleNodes.hasNext()) {
JsonNode bundleNode = bundleNodes.next();
if (bundleNode.hasNonNull("persistent") && bundleNode.get("persistent").hasNonNull(topicName)) {
found = true;
JsonNode topicNode = bundleNode.get("persistent").get(topicName);
// Verify that the "publishers" field does not include the producer for replication
assertEquals(topicNode.get("publishers").size(), 0);
assertEquals(topicNode.get("producerCount").intValue(), 0);
// Verify that the "replication" field includes replication from cluster1 to cluster2
JsonNode replicationNode = topicNode.get("replication");
assertTrue(replicationNode.get(cluster1).hasNonNull("inboundConnection"));
assertTrue(replicationNode.get(cluster1).hasNonNull("inboundConnectedSince"));
break;
}
}
}
assertTrue(found);

// cleanup.
consumer2.close();
producer1.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
cleanupTopics(topicName);
}

@Test
Expand All @@ -118,10 +151,7 @@ public void testCreateRemoteConsumerFirst() throws Exception {
// cleanup.
producer1.close();
consumer2.close();
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
cleanupTopics(topicName);
}

@Test
Expand All @@ -148,9 +178,6 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
Assert.assertFalse(replicator.isConnected());
});
// cleanup.
cleanupTopics(() -> {
admin1.topics().delete(topicName);
admin2.topics().delete(topicName);
});
cleanupTopics(topicName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport {

protected final String defaultTenant = "public";
protected final String defaultNamespace = defaultTenant + "/default";
protected ZookeeperServerTest brokerConfigZk;

protected final String cluster1 = "r1";
protected URL url1;
protected URL urlTls1;
protected ServiceConfiguration config1 = new ServiceConfiguration();
protected ZookeeperServerTest brokerConfigZk1;
protected LocalBookkeeperEnsemble bkEnsemble1;
protected PulsarService pulsar1;
protected BrokerService ns1;
Expand All @@ -55,7 +55,6 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport {
protected URL urlTls2;
protected final String cluster2 = "r2";
protected ServiceConfiguration config2 = new ServiceConfiguration();
protected ZookeeperServerTest brokerConfigZk2;
protected LocalBookkeeperEnsemble bkEnsemble2;
protected PulsarService pulsar2;
protected BrokerService ns2;
Expand All @@ -64,10 +63,8 @@ public abstract class OneWayReplicatorTestBase extends TestRetrySupport {

protected void startZKAndBK() throws Exception {
// Start ZK.
brokerConfigZk1 = new ZookeeperServerTest(0);
brokerConfigZk1.start();
brokerConfigZk2 = new ZookeeperServerTest(0);
brokerConfigZk2.start();
brokerConfigZk = new ZookeeperServerTest(0);
brokerConfigZk.start();

// Start BK.
bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> 0);
Expand All @@ -78,7 +75,7 @@ protected void startZKAndBK() throws Exception {

protected void startBrokers() throws Exception {
// Start brokers.
setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1);
setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk);
pulsar1 = new PulsarService(config1);
pulsar1.start();
ns1 = pulsar1.getBrokerService();
Expand All @@ -89,7 +86,7 @@ protected void startBrokers() throws Exception {
client1 = PulsarClient.builder().serviceUrl(url1.toString()).build();

// Start region 2
setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2);
setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk);
pulsar2 = new PulsarService(config2);
pulsar2.start();
ns2 = pulsar2.getBrokerService();
Expand All @@ -108,20 +105,6 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(false)
.build());
admin1.clusters().createCluster(cluster2, ClusterData.builder()
.serviceUrl(url2.toString())
.serviceUrlTls(urlTls2.toString())
.brokerServiceUrl(pulsar2.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar2.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(false)
.build());
admin2.clusters().createCluster(cluster1, ClusterData.builder()
.serviceUrl(url1.toString())
.serviceUrlTls(urlTls1.toString())
.brokerServiceUrl(pulsar1.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar1.getBrokerServiceUrlTls())
.brokerClientTlsEnabled(false)
.build());
admin2.clusters().createCluster(cluster2, ClusterData.builder()
.serviceUrl(url2.toString())
.serviceUrlTls(urlTls2.toString())
Expand All @@ -132,24 +115,19 @@ protected void createDefaultTenantsAndClustersAndNamespace() throws Exception {

admin1.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
Sets.newHashSet(cluster1, cluster2)));
admin2.tenants().createTenant(defaultTenant, new TenantInfoImpl(Collections.emptySet(),
Sets.newHashSet(cluster1, cluster2)));

admin1.namespaces().createNamespace(defaultNamespace, Sets.newHashSet(cluster1, cluster2));
admin2.namespaces().createNamespace(defaultNamespace);
}

protected void cleanupTopics(CleanupTopicAction cleanupTopicAction) throws Exception {
protected void cleanupTopics(String... topicNames) throws Exception {
admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Collections.singleton(cluster1));
admin1.namespaces().unload(defaultNamespace);
cleanupTopicAction.run();
for (String topicName : topicNames) {
admin1.topics().delete(topicName);
}
admin1.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(cluster1, cluster2));
}

protected interface CleanupTopicAction {
void run() throws Exception;
}

@Override
protected void setup() throws Exception {
incrementSetupNumber();
Expand Down Expand Up @@ -207,13 +185,12 @@ protected void cleanup() throws Exception {
// Stop ZK and BK.
bkEnsemble1.stop();
bkEnsemble2.stop();
brokerConfigZk1.stop();
brokerConfigZk2.stop();
brokerConfigZk.stop();

// Reset configs.
config1 = new ServiceConfiguration();
setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk1);
setConfigDefaults(config1, cluster1, bkEnsemble1, brokerConfigZk);
config2 = new ServiceConfiguration();
setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk2);
setConfigDefaults(config2, cluster2, bkEnsemble2, brokerConfigZk);
}
}

0 comments on commit 06d34ea

Please sign in to comment.