Skip to content

Commit

Permalink
[improve][broker] Add dynamic configuration to UniformLoadShedder (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
lordcheng10 committed Jul 29, 2022
1 parent 5faac76 commit 4d64e2e
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2144,6 +2144,27 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int loadBalancerAverageResourceUsageDifferenceThresholdPercentage = 10;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder strategy, the minimum message that triggers unload."
)
private int minUnloadMessage = 1000;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder strategy, the minimum throughput that triggers unload."
)
private int minUnloadMessageThroughput = 1 * 1024 * 1024;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder strategy, the maximum unload ratio."
)
private double maxUnloadPercentage = 0.2;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,6 @@
*/
@Slf4j
public class UniformLoadShedder implements LoadSheddingStrategy {

private static final int MB = 1024 * 1024;
private static final double MAX_UNLOAD_PERCENTAGE = 0.2;
private static final int MIN_UNLOAD_MESSAGE = 1000;
private static final int MIN_UNLOAD_THROUGHPUT = 1 * MB;
private final Multimap<String, String> selectedBundlesCache = ArrayListMultimap.create();
private static final double EPS = 1e-6;

Expand Down Expand Up @@ -122,14 +117,15 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
underloadedBroker.getValue(), minMsgRate.getValue(), minThroughputRate.getValue());
}
MutableInt msgRateRequiredFromUnloadedBundles = new MutableInt(
(int) ((maxMsgRate.getValue() - minMsgRate.getValue()) * MAX_UNLOAD_PERCENTAGE));
(int) ((maxMsgRate.getValue() - minMsgRate.getValue()) * conf.getMaxUnloadPercentage()));
MutableInt msgThroughputRequiredFromUnloadedBundles = new MutableInt(
(int) ((maxThroughputRate.getValue() - minThroughputRate.getValue()) * MAX_UNLOAD_PERCENTAGE));
(int) ((maxThroughputRate.getValue() - minThroughputRate.getValue())
* conf.getMaxUnloadPercentage()));
LocalBrokerData overloadedBrokerData = brokersData.get(overloadedBroker.getValue()).getLocalData();

if (overloadedBrokerData.getBundles().size() > 1
&& (msgRateRequiredFromUnloadedBundles.getValue() >= MIN_UNLOAD_MESSAGE
|| msgThroughputRequiredFromUnloadedBundles.getValue() >= MIN_UNLOAD_THROUGHPUT)) {
&& (msgRateRequiredFromUnloadedBundles.getValue() >= conf.getMinUnloadMessage()
|| msgThroughputRequiredFromUnloadedBundles.getValue() >= conf.getMinUnloadMessageThroughput())) {
// Sort bundles by throughput, then pick the bundle which can help to reduce load uniformly with
// under-loaded broker
loadBundleData.entrySet().stream()
Expand Down

0 comments on commit 4d64e2e

Please sign in to comment.