Skip to content

Commit

Permalink
[PIP-82] [pulsar-broker] updates to resource-group stats: (apache#11331)
Browse files Browse the repository at this point in the history
* use the new (get*) APIs to pull local usage stats from the broker service
  * use both directions (publish/dispaatch)
  * workaround for regression in subscribe() (see apache#11289)
  * handle the case of not-yet-configured quota value on a resource group
  * cumulative vs. 'since last reported' stats
  * adjust publish rate-limit quote at update, and after new quota calculations
  * rename class RGUsageMTAggrWaitForAllMesgs, so it is picked up in CI runs

Co-authored-by: Kaushik Ghosh <kaushikg@splunk.com>
  • Loading branch information
2 people authored and ciaocloud committed Oct 16, 2021
1 parent c97c1ca commit dfe65eb
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 69 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.val;
import org.apache.pulsar.broker.resourcegroup.ResourceGroupService.ResourceGroupOpStatus;
import org.apache.pulsar.broker.service.resource.usage.NetworkUsage;
import org.apache.pulsar.broker.service.resource.usage.ResourceUsage;
Expand Down Expand Up @@ -140,6 +141,10 @@ public ResourceGroup(ResourceGroup other) {

protected void updateResourceGroup(org.apache.pulsar.common.policies.data.ResourceGroup rgConfig) {
this.setResourceGroupConfigParameters(rgConfig);
val pubBmc = new BytesAndMessagesCount();
pubBmc.messages = rgConfig.getPublishRateInMsgs();
pubBmc.bytes = rgConfig.getPublishRateInBytes();
this.resourceGroupPublishLimiter.update(pubBmc);
}

protected long getResourceGroupNumOfNSRefs() {
Expand Down Expand Up @@ -272,6 +277,29 @@ protected BytesAndMessagesCount getLocalUsageStats(ResourceGroupMonitoringClass
return retval;
}

protected BytesAndMessagesCount getLocalUsageStatsCumulative(ResourceGroupMonitoringClass monClass)
throws PulsarAdminException {
this.checkMonitoringClass(monClass);
BytesAndMessagesCount retval = new BytesAndMessagesCount();
final PerMonitoringClassFields monEntity = this.monitoringClassFields[monClass.ordinal()];
monEntity.localUsageStatsLock.lock();
try {
// If the total wasn't accumulated yet (i.e., a report wasn't sent yet), just return the
// partial accumulation in usedLocallySinceLastReport.
if (monEntity.totalUsedLocally.messages == 0) {
retval.bytes = monEntity.usedLocallySinceLastReport.bytes;
retval.messages = monEntity.usedLocallySinceLastReport.messages;
} else {
retval.bytes = monEntity.totalUsedLocally.bytes;
retval.messages = monEntity.totalUsedLocally.messages;
}
} finally {
monEntity.localUsageStatsLock.unlock();
}

return retval;
}

protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass monClass)
throws PulsarAdminException {
this.checkMonitoringClass(monClass);
Expand All @@ -294,13 +322,14 @@ protected BytesAndMessagesCount getGlobalUsageStats(ResourceGroupMonitoringClass
protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass,
BytesAndMessagesCount newQuota) throws PulsarAdminException {
this.checkMonitoringClass(monClass);
BytesAndMessagesCount oldBMCount = new BytesAndMessagesCount();
BytesAndMessagesCount oldBMCount;

final PerMonitoringClassFields monEntity = this.monitoringClassFields[monClass.ordinal()];
monEntity.localUsageStatsLock.lock();
oldBMCount = monEntity.quotaForNextPeriod;
try {
monEntity.quotaForNextPeriod = newQuota;
this.resourceGroupPublishLimiter.update(newQuota);
} finally {
monEntity.localUsageStatsLock.unlock();
}
Expand All @@ -310,6 +339,20 @@ protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass mo
return oldBMCount;
}

protected BytesAndMessagesCount getRgPublishRateLimiterValues() {
BytesAndMessagesCount retVal;
final PerMonitoringClassFields monEntity =
this.monitoringClassFields[ResourceGroupMonitoringClass.Publish.ordinal()];
monEntity.localUsageStatsLock.lock();
try {
retVal = this.resourceGroupPublishLimiter.getResourceGroupPublishValues();
} finally {
monEntity.localUsageStatsLock.unlock();
}

return retVal;
}

// Visibility for unit testing
protected static double getRgRemoteUsageByteCount (String rgName, String monClassName, String brokerName) {
return rgRemoteUsageReportsBytes.labels(rgName, monClassName, brokerName).get();
Expand Down Expand Up @@ -541,6 +584,7 @@ public void acceptResourceUsage(String broker, ResourceUsage resourceUsage) {
.help("Number of times local usage was reported (vs. suppressed due to negligible change)")
.labelNames(resourceGroupMontoringclassLabels)
.register();

// Publish rate limiter for the resource group
@Getter
protected ResourceGroupPublishLimiter resourceGroupPublishLimiter;
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
import org.apache.pulsar.broker.service.PublishRateLimiter;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
Expand Down Expand Up @@ -74,6 +75,18 @@ public void update(PublishRate maxPublishRate) {
// No-op
}

public void update(BytesAndMessagesCount maxPublishRate) {
this.publishMaxMessageRate = (int) maxPublishRate.messages;
this.publishMaxByteRate = maxPublishRate.bytes;
}

public BytesAndMessagesCount getResourceGroupPublishValues() {
BytesAndMessagesCount bmc = new BytesAndMessagesCount();
bmc.bytes = this.publishMaxByteRate;
bmc.messages = this.publishMaxMessageRate;
return bmc;
}

public void update(ResourceGroup resourceGroup) {
replaceLimiters(() -> {
if (resourceGroup != null
Expand Down
Expand Up @@ -352,10 +352,13 @@ public boolean incrementUsage(String tenantName, String nsName,
}

// Visibility for testing.
protected BytesAndMessagesCount getRGUsage(String rgName, ResourceGroupMonitoringClass monClass)
throws PulsarAdminException {
protected BytesAndMessagesCount getRGUsage(String rgName, ResourceGroupMonitoringClass monClass,
boolean getCumulative) throws PulsarAdminException {
final ResourceGroup rg = this.getResourceGroupInternal(rgName);
if (rg != null) {
if (getCumulative) {
return rg.getLocalUsageStatsCumulative(monClass);
}
return rg.getLocalUsageStats(monClass);
}

Expand Down Expand Up @@ -427,9 +430,9 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n

try {
boolean statsUpdated = this.incrementUsage(tenantString, nsString, monClass, bmDiff);
log.debug("updateStatsWithDiff: monclass={} statsUpdated={} for tenant={}, namespace={}; "
log.debug("updateStatsWithDiff for topic={}: monclass={} statsUpdated={} for tenant={}, namespace={}; "
+ "by {} bytes, {} mesgs",
monClass, statsUpdated, tenantString, nsString,
topicName, monClass, statsUpdated, tenantString, nsString,
bmDiff.bytes, bmDiff.messages);
hm.put(topicName, bmNewCount);
} catch (Throwable t) {
Expand All @@ -438,6 +441,16 @@ private void updateStatsWithDiff(String topicName, String tenantString, String n
}
}

// Visibility for testing.
protected BytesAndMessagesCount getPublishRateLimiters (String rgName) throws PulsarAdminException {
ResourceGroup rg = this.getResourceGroupInternal(rgName);
if (rg == null) {
throw new PulsarAdminException("Resource group does not exist: " + rgName);
}

return rg.getRgPublishRateLimiterValues();
}

// Visibility for testing.
protected static double getRgQuotaByteCount (String rgName, String monClassName) {
return rgCalculatedQuotaBytes.labels(rgName, monClassName).get();
Expand Down Expand Up @@ -512,10 +525,12 @@ protected void aggregateResourceGroupLocalUsages() {
continue;
}

for (ResourceGroupMonitoringClass monClass : ResourceGroupMonitoringClass.values()) {
this.updateStatsWithDiff(topicName, tenantString, nsString,
topicStats.bytesInCounter, topicStats.msgInCounter, monClass);
}
this.updateStatsWithDiff(topicName, tenantString, nsString,
topicStats.getBytesInCounter(), topicStats.getMsgInCounter(),
ResourceGroupMonitoringClass.Publish);
this.updateStatsWithDiff(topicName, tenantString, nsString,
topicStats.getBytesOutCounter(), topicStats.getMsgOutCounter(),
ResourceGroupMonitoringClass.Dispatch);
}
double diffTimeSeconds = aggrUsageTimer.observeDuration();
log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000);
Expand Down Expand Up @@ -547,7 +562,8 @@ protected void aggregateResourceGroupLocalUsages() {

// Periodically calculate the updated quota for all RGs in the background,
// from the reports received from other brokers.
private void calculateQuotaForAllResourceGroups() {
// [Visibility for unit testing.]
protected void calculateQuotaForAllResourceGroups() {
// Calculate the quota for the next window for this RG, based on the observed usage.
final Summary.Timer quotaCalcTimer = rgQuotaCalculationLatency.startTimer();
BytesAndMessagesCount updatedQuota = new BytesAndMessagesCount();
Expand Down Expand Up @@ -611,8 +627,8 @@ private void calculateQuotaForAllResourceGroups() {
newPeriodInSeconds,
timeUnitScale);
this.resourceUsagePublishPeriodInSeconds = newPeriodInSeconds;
this.maxIntervalForSuppressingReportsMSecs =
this.resourceUsagePublishPeriodInSeconds * this.MaxUsageReportSuppressRounds;
maxIntervalForSuppressingReportsMSecs =
this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds;
}
}

Expand All @@ -630,8 +646,8 @@ private void initialize() {
periodInSecs,
periodInSecs,
this.timeUnitScale);
this.maxIntervalForSuppressingReportsMSecs =
this.resourceUsagePublishPeriodInSeconds * this.MaxUsageReportSuppressRounds;
maxIntervalForSuppressingReportsMSecs =
this.resourceUsagePublishPeriodInSeconds * MaxUsageReportSuppressRounds;

}

Expand Down Expand Up @@ -660,6 +676,8 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie

protected final ResourceQuotaCalculator quotaCalculator;
private ResourceUsageTransportManager resourceUsageTransportManagerMgr;

// rgConfigListener is used only through its side effects in the ctors, to set up RG/NS loading in config-listeners.
private final ResourceGroupConfigListener rgConfigListener;

// Given a RG-name, get the resource-group
Expand All @@ -672,8 +690,8 @@ private void checkRGCreateParams(String rgName, org.apache.pulsar.common.policie
private ConcurrentHashMap<String, ResourceGroup> namespaceToRGsMap = new ConcurrentHashMap<>();

// Maps to maintain the usage per topic, in produce/consume directions.
private ConcurrentHashMap<String, BytesAndMessagesCount> topicProduceStats = new ConcurrentHashMap();
private ConcurrentHashMap<String, BytesAndMessagesCount> topicConsumeStats = new ConcurrentHashMap();
private ConcurrentHashMap<String, BytesAndMessagesCount> topicProduceStats = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, BytesAndMessagesCount> topicConsumeStats = new ConcurrentHashMap<>();


// The task that periodically re-calculates the quota budget for local usage.
Expand Down
Expand Up @@ -36,9 +36,16 @@ public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) th
totalUsage += usage;
}

if (confUsage < 0 || myUsage < 0 || totalUsage < 0) {
String errMesg = String.format("Configured usage (%d), or local usage (%d) or total usage (%d) is negative",
confUsage, myUsage, totalUsage);
if (confUsage < 0) {
// This can happen if the RG is not configured with this particular limit (message or byte count) yet.
// It is safe to return a high value (so we don't limit) for the quota.
log.debug("Configured usage (%d) is not set; returning a high calculated quota", confUsage);
return Long.MAX_VALUE;
}

if (myUsage < 0 || totalUsage < 0) {
String errMesg = String.format("Local usage (%d) or total usage (%d) is negative",
myUsage, totalUsage);
log.error(errMesg);
throw new PulsarAdminException(errMesg);
}
Expand Down

0 comments on commit dfe65eb

Please sign in to comment.