diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java index 0a48beea88dc74..363594f051d650 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java @@ -25,6 +25,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import lombok.Getter; +import lombok.val; import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupOpStatus; import org.apache.pulsar.broker.service.resource.usage.NetworkUsage; import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; @@ -140,6 +141,10 @@ public ResourceGroup(ResourceGroup other) { protected void updateResourceGroup(org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) { this.setResourceGroupConfigParameters(rgConfig); + val pubBmc = new BytesAndMessagesCount(); + pubBmc.messages = rgConfig.getPublishRateInMsgs(); + pubBmc.bytes = rgConfig.getPublishRateInBytes(); + this.resourceGroupPublishLimiter.update(pubBmc); } protected long getResourceGroupNumOfNSRefs() { @@ -272,6 +277,29 @@ protected BytesAndMessagesCount getLocalUsageStats(ResourceGroupMonitoringClass return retval; } + protected BytesAndMessagesCount getLocalUsageStatsCumulative(ResourceGroupMonitoringClass monClass) + throws PulsarAdminException { + this.checkMonitoringClass(monClass); + BytesAndMessagesCount retval = new BytesAndMessagesCount(); + final PerMonitoringClassFields monEntity = this.monitoringClassFields[monClass.ordinal()]; + monEntity.localUsageStatsLock.lock(); + try { + // If the total wasn't accumulated yet (i.e., a report wasn't sent yet), just return the + // partial accumulation in usedLocallySinceLastReport. + if (monEntity.totalUsedLocally.messages == 0) { + retval.bytes = monEntity.usedLocallySinceLastReport.bytes; + retval.messages = monEntity.usedLocallySinceLastReport.messages; + } else { + retval.bytes = monEntity.totalUsedLocally.bytes; + retval.messages = monEntity.totalUsedLocally.messages; + } + } finally { + monEntity.localUsageStatsLock.unlock(); + } + + return retval; + } + protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass monClass) throws PulsarAdminException { this.checkMonitoringClass(monClass); @@ -294,13 +322,14 @@ protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass, BytesAndMessagesCount newQuota) throws PulsarAdminException { this.checkMonitoringClass(monClass); - BytesAndMessagesCount oldBMCount = new BytesAndMessagesCount(); + BytesAndMessagesCount oldBMCount; final PerMonitoringClassFields monEntity = this.monitoringClassFields[monClass.ordinal()]; monEntity.localUsageStatsLock.lock(); oldBMCount = monEntity.quotaForNextPeriod; try { monEntity.quotaForNextPeriod = newQuota; + this.resourceGroupPublishLimiter.update(newQuota); } finally { monEntity.localUsageStatsLock.unlock(); } @@ -310,6 +339,20 @@ protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass mo return oldBMCount; } + protected BytesAndMessagesCount getRgPublishRateLimiterValues() { + BytesAndMessagesCount retVal; + final PerMonitoringClassFields monEntity = + this.monitoringClassFields[ResourceGroupMonitoringClass.Publish.ordinal()]; + monEntity.localUsageStatsLock.lock(); + try { + retVal = this.resourceGroupPublishLimiter.getResourceGroupPublishValues(); + } finally { + monEntity.localUsageStatsLock.unlock(); + } + + return retVal; + } + // Visibility for unit testing protected static double getRgRemoteUsageByteCount (String rgName, String monClassName, String brokerName) { return rgRemoteUsageReportsBytes.labels(rgName, monClassName, brokerName).get(); @@ -541,6 +584,7 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) { .help("Number of times local usage was reported (vs. suppressed due to negligible change)") .labelNames(resourceGroupMontoringclassLabels) .register(); + // Publish rate limiter for the resource group @Getter protected ResourceGroupPublishLimiter resourceGroupPublishLimiter; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java index e0df150fd0ccef..5e75799b36fedf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; import org.apache.pulsar.broker.service.PublishRateLimiter; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; @@ -74,6 +75,18 @@ public void update(PublishRate maxPublishRate) { // No-op } + public void update(BytesAndMessagesCount maxPublishRate) { + this.publishMaxMessageRate = (int) maxPublishRate.messages; + this.publishMaxByteRate = maxPublishRate.bytes; + } + + public BytesAndMessagesCount getResourceGroupPublishValues() { + BytesAndMessagesCount bmc = new BytesAndMessagesCount(); + bmc.bytes = this.publishMaxByteRate; + bmc.messages = this.publishMaxMessageRate; + return bmc; + } + public void update(ResourceGroup resourceGroup) { replaceLimiters(() -> { if (resourceGroup != null diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java index aaee8b88755f56..ee37bdfd76eeec 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java @@ -352,10 +352,13 @@ public boolean incrementUsage(String tenantName, String nsName, } // Visibility for testing. - protected BytesAndMessagesCount getRGUsage(String rgName, ResourceGroupMonitoringClass monClass) - throws PulsarAdminException { + protected BytesAndMessagesCount getRGUsage(String rgName, ResourceGroupMonitoringClass monClass, + boolean getCumulative) throws PulsarAdminException { final ResourceGroup rg = this.getResourceGroupInternal(rgName); if (rg != null) { + if (getCumulative) { + return rg.getLocalUsageStatsCumulative(monClass); + } return rg.getLocalUsageStats(monClass); } @@ -427,9 +430,9 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n try { boolean statsUpdated = this.incrementUsage(tenantString, nsString, monClass, bmDiff); - log.debug("updateStatsWithDiff: monclass={} statsUpdated={} for tenant={}, namespace={}; " + log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; " + "by {} bytes, {} mesgs", - monClass, statsUpdated, tenantString, nsString, + topicName, monClass, statsUpdated, tenantString, nsString, bmDiff.bytes, bmDiff.messages); hm.put(topicName, bmNewCount); } catch (Throwable t) { @@ -438,6 +441,16 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n } } + // Visibility for testing. + protected BytesAndMessagesCount getPublishRateLimiters (String rgName) throws PulsarAdminException { + ResourceGroup rg = this.getResourceGroupInternal(rgName); + if (rg == null) { + throw new PulsarAdminException("Resource group does not exist: " + rgName); + } + + return rg.getRgPublishRateLimiterValues(); + } + // Visibility for testing. protected static double getRgQuotaByteCount (String rgName, String monClassName) { return rgCalculatedQuotaBytes.labels(rgName, monClassName).get(); @@ -512,10 +525,12 @@ protected void aggregateResourceGroupLocalUsages() { continue; } - for (ResourceGroupMonitoringClass monClass : ResourceGroupMonitoringClass.values()) { - this.updateStatsWithDiff(topicName, tenantString, nsString, - topicStats.bytesInCounter, topicStats.msgInCounter, monClass); - } + this.updateStatsWithDiff(topicName, tenantString, nsString, + topicStats.getBytesInCounter(), topicStats.getMsgInCounter(), + ResourceGroupMonitoringClass.Publish); + this.updateStatsWithDiff(topicName, tenantString, nsString, + topicStats.getBytesOutCounter(), topicStats.getMsgOutCounter(), + ResourceGroupMonitoringClass.Dispatch); } double diffTimeSeconds = aggrUsageTimer.observeDuration(); log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000); @@ -547,7 +562,8 @@ protected void aggregateResourceGroupLocalUsages() { // Periodically calculate the updated quota for all RGs in the background, // from the reports received from other brokers. - private void calculateQuotaForAllResourceGroups() { + // [Visibility for unit testing.] + protected void calculateQuotaForAllResourceGroups() { // Calculate the quota for the next window for this RG, based on the observed usage. final Summary.Timer quotaCalcTimer = rgQuotaCalculationLatency.startTimer(); BytesAndMessagesCount updatedQuota = new BytesAndMessagesCount(); @@ -611,8 +627,8 @@ private void calculateQuotaForAllResourceGroups() { newPeriodInSeconds, timeUnitScale); this.resourceUsagePublishPeriodInSeconds = newPeriodInSeconds; - this.maxIntervalForSuppressingReportsMSecs = - this.resourceUsagePublishPeriodInSeconds * this.MaxUsageReportSuppressRounds; + maxIntervalForSuppressingReportsMSecs = + this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds; } } @@ -630,8 +646,8 @@ private void initialize() { periodInSecs, periodInSecs, this.timeUnitScale); - this.maxIntervalForSuppressingReportsMSecs = - this.resourceUsagePublishPeriodInSeconds * this.MaxUsageReportSuppressRounds; + maxIntervalForSuppressingReportsMSecs = + this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds; } @@ -660,6 +676,8 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie protected final ResourceQuotaCalculator quotaCalculator; private ResourceUsageTransportManager resourceUsageTransportManagerMgr; + + // rgConfigListener is used only through its side effects in the ctors, to set up RG/NS loading in config-listeners. private final ResourceGroupConfigListener rgConfigListener; // Given a RG-name, get the resource-group @@ -672,8 +690,8 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie private ConcurrentHashMap namespaceToRGsMap = new ConcurrentHashMap<>(); // Maps to maintain the usage per topic, in produce/consume directions. - private ConcurrentHashMap topicProduceStats = new ConcurrentHashMap(); - private ConcurrentHashMap topicConsumeStats = new ConcurrentHashMap(); + private ConcurrentHashMap topicProduceStats = new ConcurrentHashMap<>(); + private ConcurrentHashMap topicConsumeStats = new ConcurrentHashMap<>(); // The task that periodically re-calculates the quota budget for local usage. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java index 06e4c9008648ba..2a2865dbed2817 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImpl.java @@ -36,9 +36,16 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) th totalUsage += usage; } - if (confUsage < 0 || myUsage < 0 || totalUsage < 0) { - String errMesg = String.format("Configured usage (%d), or local usage (%d) or total usage (%d) is negative", - confUsage, myUsage, totalUsage); + if (confUsage < 0) { + // This can happen if the RG is not configured with this particular limit (message or byte count) yet. + // It is safe to return a high value (so we don't limit) for the quota. + log.debug("Configured usage (%d) is not set; returning a high calculated quota", confUsage); + return Long.MAX_VALUE; + } + + if (myUsage < 0 || totalUsage < 0) { + String errMesg = String.format("Local usage (%d) or total usage (%d) is negative", + myUsage, totalUsage); log.error(errMesg); throw new PulsarAdminException(errMesg); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMesgs.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMesgsTest.java similarity index 92% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMesgs.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMesgsTest.java index 833b82c0e1c8fd..d1ea3e7919d5c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMesgs.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMesgsTest.java @@ -57,7 +57,7 @@ // The tenants and namespaces in those topics are associated with a set of resource-groups (RGs). // After sending/receiving all the messages, traffic usage statistics, and Prometheus-metrics // are verified on the RGs. -public class RGUsageMTAggrWaitForAllMesgs extends ProducerConsumerBase { +public class RGUsageMTAggrWaitForAllMesgsTest extends ProducerConsumerBase { @BeforeClass @Override protected void setup() throws Exception { @@ -397,7 +397,9 @@ private void unRegisterTenantsAndNamespaces(String[] topicStrings) throws Except // Produce/consume messages on the given topics, and verify that the resource-group stats are updated. private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception { createRGs(); - createTopics(topicStrings); + // creating the topics results in exposing a regression. + // It can be put back after https://github.com/apache/pulsar/issues/11289 is fixed. + // createTopics(topicStrings); registerTenantsAndNamespaces(topicStrings); final int TotalExpectedMessagesToSend = NumTotalMessages; @@ -523,16 +525,18 @@ private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception // // Verify the producer side stats (msgInCounter/bytesInCounter) only. // this.verfyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, 0, 0,true, false); +// this.verifyRGMetrics(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, false); // Verify producer and consumer side stats. - this.verfyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, - recvdNumBytes, recvdNumMsgs, true, true); + this.verfyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true); // Verify the metrics corresponding to the operations in this test. - this.verifyRGMetrics(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs); + this.verifyRGMetrics(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs, true, true); unRegisterTenantsAndNamespaces(topicStrings); - destroyTopics(topicStrings); + // destroyTopics can be called after createTopics() is added back + // (see comment above regarding https://github.com/apache/pulsar/issues/11289). + // destroyTopics(topicStrings); destroyRGs(); } @@ -547,7 +551,7 @@ private void verfyRGProdConsStats(String[] topicStrings, BrokerService bs = pulsar.getBrokerService(); Map topicStatsMap = bs.getTopicStats(); - log.info("verfyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size()); + log.debug("verfyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size()); // Pulsar runtime adds some additional bytes in the exchanges: a 45-byte per-message // metadata of some kind, plus more as the number of messages increases. @@ -575,9 +579,13 @@ private void verfyRGProdConsStats(String[] topicStrings, // Since the following walk is on topics, keep track of the RGs for which we have already gathered stats, // so that we do not double-accumulate stats if multiple topics refer to the same RG. - HashSet RGsWithPublisStatsGathered = new HashSet<>(); + HashSet RGsWithPublishStatsGathered = new HashSet<>(); HashSet RGsWithDispatchStatsGathered = new HashSet<>(); + // Hack to ensure aggregator calculation without waiting for a period of aggregation. + // [aggregateResourceGroupLocalUsages() is idempotent when there's no fresh traffic flowing.] + this.rgservice.aggregateResourceGroupLocalUsages(); + for (Map.Entry entry : topicStatsMap.entrySet()) { String mapTopicName = entry.getKey(); if (Arrays.asList(topicStrings).contains(mapTopicName)) { @@ -596,18 +604,16 @@ private void verfyRGProdConsStats(String[] topicStrings, if (sentNumMsgs > 0 || recvdNumMsgs > 0) { TopicName topic = TopicName.get(mapTopicName); - // Hack to ensure aggregator calculation without waiting for a period of aggregation. - // [aggregateResourceGroupLocalUsages() is idempotent when there's no fresh traffic flowing.] - this.rgservice.aggregateResourceGroupLocalUsages(); - final String tenantRGName = TopicToTenantRGName(topic); - if (!RGsWithPublisStatsGathered.contains(tenantRGName)) { - prodCounts = this.rgservice.getRGUsage(tenantRGName, ResourceGroupMonitoringClass.Publish); + if (!RGsWithPublishStatsGathered.contains(tenantRGName)) { + prodCounts = this.rgservice.getRGUsage(tenantRGName, ResourceGroupMonitoringClass.Publish, + getCumulativeUsageStats); totalTenantRGProdCounts = ResourceGroup.accumulateBMCount(totalTenantRGProdCounts, prodCounts); - RGsWithPublisStatsGathered.add(tenantRGName); + RGsWithPublishStatsGathered.add(tenantRGName); } if (!RGsWithDispatchStatsGathered.contains(tenantRGName)) { - consCounts = this.rgservice.getRGUsage(tenantRGName, ResourceGroupMonitoringClass.Dispatch); + consCounts = this.rgservice.getRGUsage(tenantRGName, ResourceGroupMonitoringClass.Dispatch, + getCumulativeUsageStats); totalTenantRGConsCounts = ResourceGroup.accumulateBMCount(totalTenantRGConsCounts, consCounts); RGsWithDispatchStatsGathered.add(tenantRGName); } @@ -616,13 +622,15 @@ private void verfyRGProdConsStats(String[] topicStrings, // If tenantRGName == nsRGName, the RG-infra will avoid double counting. // We will do the same here, to get the expected stats. if (tenantRGName.compareTo(nsRGName) != 0) { - if (!RGsWithPublisStatsGathered.contains(nsRGName)) { - prodCounts = this.rgservice.getRGUsage(nsRGName, ResourceGroupMonitoringClass.Publish); + if (!RGsWithPublishStatsGathered.contains(nsRGName)) { + prodCounts = this.rgservice.getRGUsage(nsRGName, ResourceGroupMonitoringClass.Publish, + getCumulativeUsageStats); totalNsRGProdCounts = ResourceGroup.accumulateBMCount(totalNsRGProdCounts, prodCounts); - RGsWithPublisStatsGathered.add(nsRGName); + RGsWithPublishStatsGathered.add(nsRGName); } if (!RGsWithDispatchStatsGathered.contains(nsRGName)) { - consCounts = this.rgservice.getRGUsage(nsRGName, ResourceGroupMonitoringClass.Dispatch); + consCounts = this.rgservice.getRGUsage(nsRGName, ResourceGroupMonitoringClass.Dispatch, + getCumulativeUsageStats); totalNsRGConsCounts = ResourceGroup.accumulateBMCount(totalNsRGConsCounts, consCounts); RGsWithDispatchStatsGathered.add(nsRGName); } @@ -663,7 +671,8 @@ private void verfyRGProdConsStats(String[] topicStrings, // Check the metrics for the RGs involved private void verifyRGMetrics(String[] topicStrings, int sentNumBytes, int sentNumMsgs, - int recvdNumBytes, int recvdNumMsgs) throws Exception { + int recvdNumBytes, int recvdNumMsgs, + boolean checkProduce, boolean checkConsume) throws Exception { boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings); final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs; @@ -685,29 +694,32 @@ private void verifyRGMetrics(String[] topicStrings, // 'ScaleFactor' is a way to incorporate that effect in the verification. final int ScaleFactor = tenantRGEqualsNsRG ? 1 : 2; - ResourceGroupService rgs = this.rgservice; + // Hack to ensure aggregator calculation without waiting for a period of aggregation. + // [aggregateResourceGroupLocalUsages() is idempotent when there's no new traffic flowing.] + this.rgservice.aggregateResourceGroupLocalUsages(); + for (String rgName : RGNames) { for (ResourceGroupMonitoringClass mc : ResourceGroupMonitoringClass.values()) { String mcName = mc.name(); int mcIndex = mc.ordinal(); - double quotaBytes = rgs.getRgQuotaByteCount(rgName, mcName); + double quotaBytes = ResourceGroupService.getRgQuotaByteCount(rgName, mcName); totalQuotaBytes[mcIndex] += quotaBytes; - double quotaMesgs = rgs.getRgQuotaMessageCount(rgName, mcName); + double quotaMesgs = ResourceGroupService.getRgQuotaMessageCount(rgName, mcName); totalQuotaMessages[mcIndex] += quotaMesgs; - double usedBytes = rgs.getRgLocalUsageByteCount(rgName, mcName); + double usedBytes = ResourceGroupService.getRgLocalUsageByteCount(rgName, mcName); totalUsedBytes[mcIndex] += usedBytes; - double usedMesgs = rgs.getRgLocalUsageMessageCount(rgName, mcName); + double usedMesgs = ResourceGroupService.getRgLocalUsageMessageCount(rgName, mcName); totalUsedMessages[mcIndex] += usedMesgs; double usageReportedCount = ResourceGroup.getRgUsageReportedCount(rgName, mcName); totalUsageReportCounts[mcIndex] += usageReportedCount; } - totalTenantRegisters += rgs.getRgTenantRegistersCount(rgName); - totalTenantUnRegisters += rgs.getRgTenantUnRegistersCount(rgName); - totalNamespaceRegisters += rgs.getRgNamespaceRegistersCount(rgName); - totalNamespaceUnRegisters += rgs.getRgNamespaceUnRegistersCount(rgName);; - totalUpdates += rgs.getRgUpdatesCount(rgName); + totalTenantRegisters += ResourceGroupService.getRgTenantRegistersCount(rgName); + totalTenantUnRegisters += ResourceGroupService.getRgTenantUnRegistersCount(rgName); + totalNamespaceRegisters += ResourceGroupService.getRgNamespaceRegistersCount(rgName); + totalNamespaceUnRegisters += ResourceGroupService.getRgNamespaceUnRegistersCount(rgName);; + totalUpdates += ResourceGroupService.getRgUpdatesCount(rgName); } log.info("totalTenantRegisters={}, totalTenantUnRegisters={}, " + "totalNamespaceRegisters={}, totalNamespaceUnRegisters={}", @@ -735,12 +747,12 @@ private void verifyRGMetrics(String[] topicStrings, totalUsedBytes[mcIdx], totalUsedMessages[mcIdx], totalUsageReportCounts[mcIdx]); // On each run, the bytes/messages are monotone incremented in Prometheus metrics. // So, we take the residuals into account when comparing against the expected. - if (mc == ResourceGroupMonitoringClass.Publish) { + if (checkProduce && mc == ResourceGroupMonitoringClass.Publish) { Assert.assertEquals(totalUsedMessages[mcIdx] - residualSentNumMessages, sentNumMsgs * ScaleFactor); Assert.assertTrue(totalUsedBytes[mcIdx] - residualSentNumBytes >= ExpectedNumBytesSent * ScaleFactor); - } else if (mc == ResourceGroupMonitoringClass.Dispatch) { + } else if (checkConsume && mc == ResourceGroupMonitoringClass.Dispatch) { Assert.assertEquals(totalUsedMessages[mcIdx] - residualRecvdNumMessages, recvdNumMsgs * ScaleFactor); Assert.assertTrue(totalUsedBytes[mcIdx] - residualRecvdNumBytes @@ -760,7 +772,7 @@ private void verifyRGMetrics(String[] topicStrings, Assert.assertEquals(totalUpdates, 0); // currently, we don't update the RGs in this UT // Basic check that latency metrics are doing some work. - Summary.Child.Value usageAggrLatency = rgs.getRgUsageAggregationLatency(); + Summary.Child.Value usageAggrLatency = ResourceGroupService.getRgUsageAggregationLatency(); Assert.assertNotEquals(usageAggrLatency.count, 0); Assert.assertNotEquals(usageAggrLatency.sum, 0); double fiftiethPercentileValue = usageAggrLatency.quantiles.get(0.5); @@ -768,7 +780,7 @@ private void verifyRGMetrics(String[] topicStrings, double ninetethPercentileValue = usageAggrLatency.quantiles.get(0.9); Assert.assertNotEquals(ninetethPercentileValue, 0); - Summary.Child.Value quotaCalcLatency = rgs.getRgQuotaCalculationTime(); + Summary.Child.Value quotaCalcLatency = ResourceGroupService.getRgQuotaCalculationTime(); Assert.assertNotEquals(quotaCalcLatency.count, 0); Assert.assertNotEquals(quotaCalcLatency.sum, 0); fiftiethPercentileValue = quotaCalcLatency.quantiles.get(0.5); @@ -777,7 +789,7 @@ private void verifyRGMetrics(String[] topicStrings, Assert.assertNotEquals(ninetethPercentileValue, 0); } - private static final Logger log = LoggerFactory.getLogger(RGUsageMTAggrWaitForAllMesgs.class); + private static final Logger log = LoggerFactory.getLogger(RGUsageMTAggrWaitForAllMesgsTest.class); // Empirically, there appears to be a 45-byte overhead for metadata, imposed by Pulsar runtime. private final int PER_MESSAGE_METADATA_OHEAD = 45; @@ -832,6 +844,9 @@ private void verifyRGMetrics(String[] topicStrings, NonPersistentTopicNamesSameTenantAndNsRGs, NonPersistentTopicNamesDifferentTenantAndNsRGs); + // We don't periodically report to a remote broker in this test. So, we will use cumulative stats. + private final boolean getCumulativeUsageStats = true; + // Keep track of the namespaces that were created, so we don't dup and get exceptions HashSet createdNamespaces = new HashSet<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java index b0e0676efcae87..753957709ca471 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupServiceTest.java @@ -19,26 +19,20 @@ package org.apache.pulsar.broker.resourcegroup; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; - import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields; import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount; - import org.apache.pulsar.broker.service.resource.usage.NetworkUsage; import org.apache.pulsar.broker.service.resource.usage.ResourceUsage; import org.apache.pulsar.client.admin.PulsarAdminException; - import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; -import org.apache.pulsar.common.policies.data.ClusterDataImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; - import org.testng.Assert; import org.testng.annotations.Test; - import java.util.HashSet; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -159,6 +153,7 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep rgConfig.setDispatchRateInBytes(40000); rgConfig.setDispatchRateInMsgs(500); + int initialNumQuotaCalculations = numAnonymousQuotaCalculations; rgs.resourceGroupCreate(rgName, rgConfig); Assert.assertThrows(PulsarAdminException.class, () -> rgs.resourceGroupCreate(rgName, rgConfig)); @@ -222,7 +217,7 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep // We know that dummyQuotaCalc::needToReportLocalUsage() makes us report usage once every // maxUsageReportSuppressRounds iterations. So, if we run for maxUsageReportSuppressRounds iterations, // we should see needToReportLocalUsage() return true at least once. - Set myBoolSet = new HashSet(); + Set myBoolSet = new HashSet<>(); myBoolSet.clear(); for (int idx = 0; idx < ResourceGroupService.MaxUsageReportSuppressRounds; idx++) { needToReport = retRG.setUsageInMonitoredEntity(monClass, nwUsage); @@ -236,13 +231,33 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep rgs.unRegisterTenant(rgName, tenantName); rgs.unRegisterNameSpace(rgName, namespaceName); + BytesAndMessagesCount publishQuota = rgs.getPublishRateLimiters(rgName); + + // Calculated quota is synthetically set to the number of quota-calculation callbacks. + int numQuotaCalcsDuringTest = numAnonymousQuotaCalculations - initialNumQuotaCalculations; + if (numQuotaCalcsDuringTest == 0) { + // Quota calculations were not done yet during this test; we expect to see the default "initial" setting. + Assert.assertEquals(publishQuota.messages, rgConfig.getPublishRateInMsgs()); + Assert.assertEquals(publishQuota.bytes, rgConfig.getPublishRateInBytes()); + } + + // Calculate the quota synchronously to avoid waiting for a periodic call within ResourceGroupService. + rgs.calculateQuotaForAllResourceGroups(); + publishQuota = rgs.getPublishRateLimiters(rgName); + // The bytes/messages are (synthetically) set from numAnonymousQuotaCalculations in the above round of + // calls, or some later round (since the periodic call to calculateQuotaForAllResourceGroups() would be + // ongoing). So, we expect bytes/messages setting to be more than 0 and at most numAnonymousQuotaCalculations. + Assert.assertTrue(publishQuota.messages > 0 && publishQuota.messages <= numAnonymousQuotaCalculations); + Assert.assertTrue(publishQuota.bytes > 0 && publishQuota.bytes <= numAnonymousQuotaCalculations); + rgs.resourceGroupDelete(rgName); + Assert.assertThrows(PulsarAdminException.class, () -> rgs.getPublishRateLimiters(rgName)); Assert.assertEquals(rgs.getNumResourceGroups(), 0); } private ResourceGroupService rgs; - int numAnonymousQuotaCalculations = 0; + int numAnonymousQuotaCalculations; private static final Logger log = LoggerFactory.getLogger(ResourceGroupServiceTest.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java index 9078159237b3b7..345dc4dece00ce 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupUsageAggregationTest.java @@ -204,13 +204,17 @@ private void verfyStats(String topicString, String rgName, if (sentNumMsgs > 0 || recvdNumMsgs > 0) { rgs.aggregateResourceGroupLocalUsages(); // hack to ensure aggregator calculation without waiting - BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish); - BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch); + BytesAndMessagesCount prodCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish, + true); + BytesAndMessagesCount consCounts = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch, + true); // Re-do the getRGUsage. // The counts should be equal, since there wasn't any intervening traffic on TEST_PRODUCE_CONSUME_TOPIC. - BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish); - BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch); + BytesAndMessagesCount prodCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Publish, + true); + BytesAndMessagesCount consCounts1 = rgs.getRGUsage(rgName, ResourceGroupMonitoringClass.Dispatch, + true); Assert.assertTrue(prodCounts.bytes == prodCounts1.bytes); Assert.assertTrue(prodCounts.messages == prodCounts1.messages); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java index c9daab9109148b..7d3653a2a41387 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceQuotaCalculatorImplTest.java @@ -43,9 +43,10 @@ protected void cleanup() throws Exception { } @Test - public void testRQCalcNegativeConfTest() { + public void testRQCalcNegativeConfTest() throws PulsarAdminException { final long[] allUsage = { 0 }; - Assert.assertThrows(PulsarAdminException.class, () -> this.rqCalc.computeLocalQuota(-1, 0, allUsage)); + long calculatedQuota = this.rqCalc.computeLocalQuota(-1, 0, allUsage); + Assert.assertEquals(calculatedQuota, Long.MAX_VALUE); } @Test