Skip to content

Commit

Permalink
[fix][broker] usedLocallySinceLastReport should always be reset
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed May 8, 2024
1 parent c30765e commit 7e747ff
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -458,14 +458,13 @@ protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClas

bytesUsed = monEntity.usedLocallySinceLastReport.bytes;
messagesUsed = monEntity.usedLocallySinceLastReport.messages;

monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
if (sendReport) {
p.setBytesPerPeriod(bytesUsed);
p.setMessagesPerPeriod(messagesUsed);
monEntity.lastReportedValues.bytes = bytesUsed;
monEntity.lastReportedValues.messages = messagesUsed;
monEntity.numSuppressedUsageReports = 0;
monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
monEntity.totalUsedLocally.bytes += bytesUsed;
monEntity.totalUsedLocally.messages += messagesUsed;
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.resourcegroup;

import java.util.UUID;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
import org.apache.pulsar.broker.resourcegroup.ResourceGroup.PerMonitoringClassFields;
Expand Down Expand Up @@ -257,6 +258,67 @@ public void testResourceGroupOps() throws PulsarAdminException, InterruptedExcep
Assert.assertEquals(rgs.getNumResourceGroups(), 0);
}

@Test
public void testResourceGroupResetUsedLocallySinceLastReport() throws PulsarAdminException {
org.apache.pulsar.common.policies.data.ResourceGroup rgConfig =
new org.apache.pulsar.common.policies.data.ResourceGroup();
final String rgName = UUID.randomUUID().toString();
rgConfig.setPublishRateInBytes(15000L);
rgConfig.setPublishRateInMsgs(100);
rgConfig.setDispatchRateInBytes(40000L);
rgConfig.setDispatchRateInMsgs(500);

this.pulsar.getResourceGroupServiceManager().resourceGroupCreate(rgName, rgConfig);

ResourceGroup retRG = this.pulsar.getResourceGroupServiceManager().resourceGroupGet(rgName);

PerMonitoringClassFields monClassFields = null;
// Case1: Suppress report ResourceUsage.
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
monClassFields = retRG.monitoringClassFields[value.ordinal()];
monClassFields.usedLocallySinceLastReport.bytes = monClassFields.lastReportedValues.bytes = 10;
monClassFields.usedLocallySinceLastReport.messages = monClassFields.lastReportedValues.messages = 10;
monClassFields.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
}

ResourceUsage resourceUsage = new ResourceUsage();
retRG.rgFillResourceUsage(resourceUsage);
Assert.assertFalse(resourceUsage.hasDispatch());
Assert.assertFalse(resourceUsage.hasPublish());

for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
monClassFields = retRG.monitoringClassFields[value.ordinal()];
Assert.assertEquals(monClassFields.usedLocallySinceLastReport.messages, 0L);
Assert.assertEquals(monClassFields.usedLocallySinceLastReport.bytes, 0L);
}

// Case2: Report ResourceUsage.
for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
monClassFields = retRG.monitoringClassFields[value.ordinal()];
monClassFields.usedLocallySinceLastReport.bytes = monClassFields.lastReportedValues.bytes * 2;
monClassFields.usedLocallySinceLastReport.messages = monClassFields.lastReportedValues.messages * 2;
}

resourceUsage = new ResourceUsage();
retRG.rgFillResourceUsage(resourceUsage);
Assert.assertTrue(resourceUsage.hasDispatch());
NetworkUsage dispatch = resourceUsage.getDispatch();
Assert.assertNotNull(monClassFields);
Assert.assertEquals(dispatch.getBytesPerPeriod(), monClassFields.lastReportedValues.bytes);
Assert.assertEquals(dispatch.getMessagesPerPeriod(), monClassFields.lastReportedValues.messages);

Assert.assertTrue(resourceUsage.hasPublish());
NetworkUsage publish = resourceUsage.getPublish();
Assert.assertEquals(publish.getBytesPerPeriod(), monClassFields.lastReportedValues.bytes);
Assert.assertEquals(publish.getMessagesPerPeriod(), monClassFields.lastReportedValues.messages);

for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
monClassFields = retRG.monitoringClassFields[value.ordinal()];
Assert.assertEquals(monClassFields.usedLocallySinceLastReport.messages, 0L);
Assert.assertEquals(monClassFields.usedLocallySinceLastReport.bytes, 0L);
}
}

@Test
public void testClose() throws Exception {
ResourceGroupService service = new ResourceGroupService(pulsar, TimeUnit.MILLISECONDS, null, null);
Expand Down

0 comments on commit 7e747ff

Please sign in to comment.