Skip to content

Commit

Permalink
Make LoadBalancer's ResourceUsage class immutable (#13639)
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljmarshall committed Jan 14, 2022
1 parent a05694b commit a255992
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.beust.jcommander.internal.Lists;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
Expand All @@ -46,6 +45,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -211,19 +211,19 @@ public static String getNamespaceNameFromBundleName(String bundleName) {
}

// Get the system resource usage for this broker.
public static SystemResourceUsage getSystemResourceUsage(final BrokerHostUsage brokerHostUsage) throws IOException {
public static SystemResourceUsage getSystemResourceUsage(final BrokerHostUsage brokerHostUsage) {
SystemResourceUsage systemResourceUsage = brokerHostUsage.getBrokerHostUsage();

// Override System memory usage and limit with JVM heap usage and limit
long maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();
long memoryUsageInBytes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
systemResourceUsage.memory.usage = (double) memoryUsageInBytes / MIBI;
systemResourceUsage.memory.limit = (double) maxHeapMemoryInBytes / MIBI;
double maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();
double memoryUsageInBytes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
double memoryUsage = memoryUsageInBytes / MIBI;
double memoryLimit = maxHeapMemoryInBytes / MIBI;
systemResourceUsage.setMemory(new ResourceUsage(memoryUsage, memoryLimit));

// Collect JVM direct memory
systemResourceUsage.directMemory.usage = (double) (getJvmDirectMemoryUsed() / MIBI);
systemResourceUsage.directMemory.limit =
(double) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / MIBI);
systemResourceUsage.setDirectMemory(new ResourceUsage((double) (getJvmDirectMemoryUsed() / MIBI),
(double) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / MIBI)));

return systemResourceUsage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -70,6 +69,7 @@
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUnitRanking;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1026,10 +1026,11 @@ private long getAverageJvmHeapUsageMBytes() {
}
}

public SystemResourceUsage getSystemResourceUsage() throws IOException {
public SystemResourceUsage getSystemResourceUsage() {
SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage);
long memoryUsageInMBytes = getAverageJvmHeapUsageMBytes();
systemResourceUsage.memory.usage = (double) memoryUsageInMBytes;
systemResourceUsage
.setMemory(new ResourceUsage((double) memoryUsageInMBytes, systemResourceUsage.memory.limit));
return systemResourceUsage;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public void testLoadShedding() throws Exception {
// 80% is below overload threshold: verify nothing is unloaded.
verify(namespacesSpy1, Mockito.times(0)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());

localBrokerData.getCpu().usage = 90;
localBrokerData.setCpu(new ResourceUsage(90, 100));
primaryLoadManager.doLoadShedding();
// Most expensive bundle will be unloaded.
verify(namespacesSpy1, Mockito.times(1)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
Expand Down Expand Up @@ -412,17 +412,13 @@ public void testNeedBrokerDataUpdate() throws Exception {
assert (!needUpdate.get());

// Minimally test other absolute values to ensure they are included.
lastData.getCpu().usage = 100;
lastData.getCpu().limit = 1000;
currentData.getCpu().usage = 106;
currentData.getCpu().limit = 1000;
lastData.setCpu(new ResourceUsage(100, 1000));
currentData.setCpu(new ResourceUsage(106, 1000));
assert (!needUpdate.get());

// Minimally test other absolute values to ensure they are included.
lastData.getCpu().usage = 100;
lastData.getCpu().limit = 1000;
currentData.getCpu().usage = 206;
currentData.getCpu().limit = 1000;
lastData.setCpu(new ResourceUsage(100, 1000));
currentData.setCpu(new ResourceUsage(206, 1000));
assert (needUpdate.get());

lastData.setCpu(new ResourceUsage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,6 @@ public void testUsage() {
double usageLimit = 10.0;
usage.setBandwidthIn(new ResourceUsage(usageLimit, usageLimit));
assertEquals(usage.getBandwidthIn().usage, usageLimit);
usage.reset();
assertNotEquals(usage.getBandwidthIn().usage, usageLimit);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,27 @@
import lombok.EqualsAndHashCode;

/**
* POJO used to represents any system specific resource usage this is the format that load manager expects it in.
* POJO used to represent any system specific resource usage this is the format that load manager expects it in.
*/
@EqualsAndHashCode
public class ResourceUsage {
public double usage;
public double limit;
public final double usage;
public final double limit;
@EqualsAndHashCode.Exclude
private final float percentUsage;

public ResourceUsage(double usage, double limit) {
this.usage = usage;
this.limit = limit;
}

public ResourceUsage(ResourceUsage that) {
this.usage = that.usage;
this.limit = that.limit;
float proportion = 0;
if (limit > 0) {
proportion = ((float) usage) / ((float) limit);
}
percentUsage = proportion * 100;
}

public ResourceUsage() {
}

public void reset() {
this.usage = -1;
this.limit = -1;
this(0, 0);
}

/**
Expand All @@ -59,10 +57,6 @@ public int compareTo(ResourceUsage o) {
}

public float percentUsage() {
float proportion = 0;
if (limit > 0) {
proportion = ((float) usage) / ((float) limit);
}
return proportion * 100;
return percentUsage;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ private void updateSystemResourceUsage(final SystemResourceUsage systemResourceU
// Update resource usage given each individual usage.
private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
final ResourceUsage directMemory, final ResourceUsage bandwidthIn, final ResourceUsage bandwidthOut) {
this.cpu = new ResourceUsage(cpu);
this.memory = new ResourceUsage(memory);
this.directMemory = new ResourceUsage(directMemory);
this.bandwidthIn = new ResourceUsage(bandwidthIn);
this.bandwidthOut = new ResourceUsage(bandwidthOut);
this.cpu = cpu;
this.memory = memory;
this.directMemory = directMemory;
this.bandwidthIn = bandwidthIn;
this.bandwidthOut = bandwidthOut;
}

// Aggregate all message, throughput, topic count, bundle count, consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@ public SystemResourceUsage() {
directMemory = new ResourceUsage(-1, -1);
}

public void reset() {
bandwidthIn.reset();
bandwidthOut.reset();
cpu.reset();
memory.reset();
directMemory.reset();
}

public ResourceUsage getBandwidthIn() {
return bandwidthIn;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,8 @@ public void testLoadReportSerialization() throws Exception {
final String simpleLmReportName = "simpleLoadManager";
final String modularLmBrokerUrl = "modular";
final SystemResourceUsage simpleLmSystemResourceUsage = new SystemResourceUsage();
final ResourceUsage resource = new ResourceUsage();
final double usage = 55.0;
resource.usage = usage;
final ResourceUsage resource = new ResourceUsage(usage, 0);
simpleLmSystemResourceUsage.bandwidthIn = resource;

LoadReport simpleReport = getSimpleLoadManagerLoadReport(simpleLmBrokerUrl, simpleLmReportName,
Expand Down

0 comments on commit a255992

Please sign in to comment.