Skip to content

Commit

Permalink
fix shedding heartbeat ns (apache#13208)
Browse files Browse the repository at this point in the history
Related to apache#12252

I found that the problem mentioned in apache#12252 has not been solved, because the `HEARTBEAT_NAMESPACE_PATTERN` pattern needs a namespace as input, but what actually provides is the full name of the bundle.

1. fix the parttern matching problem
2. add a test case for it

This change is already covered by existing tests.

(cherry picked from commit 78e3d8f)
  • Loading branch information
wuzhanpeng authored and eolivelli committed Feb 4, 2022
1 parent 7218372 commit 6281523
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.BrokerData;
import org.apache.pulsar.broker.BundleData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.NamespaceBundle;

/**
* This class represents all data that could be relevant when making a load management decision.
Expand Down Expand Up @@ -59,6 +62,13 @@ public Map<String, BundleData> getBundleData() {
return bundleData;
}

public Map<String, BundleData> getBundleDataForLoadShedding() {
return bundleData.entrySet().stream()
.filter(e -> !NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(e.getKey())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Map<String, Long> getRecentlyUnloadedBundles() {
return recentlyUnloadedBundles;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,16 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
if (localData.getBundles().size() > 1) {
// Sort bundles by throughput, then pick the biggest N which combined
// make up for at least the minimum throughput to offload

loadData.getBundleData().entrySet().stream()
.filter(e -> localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData
.getMsgThroughputOut();
loadData.getBundleDataForLoadShedding().entrySet().stream()
.filter(e -> localData.getBundles().contains(e.getKey()))
.map((e) -> {
// Map to throughput value
// Consider short-term byte rate to address system resource burden
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData
.getMsgThroughputOut();
return Pair.of(bundle, throughput);
}).filter(e -> {
// Only consider bundles that were not already unloaded recently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,13 @@ public Multimap<String, String> findBundlesForUnloading(final LoadData loadData,
MutableBoolean atLeastOneBundleSelected = new MutableBoolean(false);

if (localData.getBundles().size() > 1) {
loadData.getBundleData().entrySet().stream().map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
loadData.getBundleDataForLoadShedding().entrySet().stream()
.map((e) -> {
String bundle = e.getKey();
BundleData bundleData = e.getValue();
TimeAverageMessageData shortTermData = bundleData.getShortTermData();
double throughput = shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut();
return Pair.of(bundle, throughput);
}).filter(e ->
!recentlyUnloadedBundles.containsKey(e.getLeft())
).filter(e ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public enum AddressType {

public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN_V2 = Pattern.compile("pulsar/([^:]+:\\d+)");
public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
public static final String HEARTBEAT_NAMESPACE_FMT = "pulsar/%s/%s:%s";
public static final String SLA_NAMESPACE_FMT = SLA_NAMESPACE_PROPERTY + "/%s/%s:%s";
Expand Down Expand Up @@ -1360,6 +1361,12 @@ public static String getSLAMonitorBrokerName(ServiceUnitId ns) {
}
}

public static boolean isSystemServiceNamespace(String namespace) {
return HEARTBEAT_NAMESPACE_PATTERN.matcher(namespace).matches()
|| HEARTBEAT_NAMESPACE_PATTERN_V2.matcher(namespace).matches()
|| SLA_NAMESPACE_PATTERN.matcher(namespace).matches();
}

public boolean registerSLANamespace() throws PulsarServerException {
boolean isNameSpaceRegistered = registerNamespace(getSLAMonitorNamespace(host, config), false);
if (isNameSpaceRegistered) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ public static String getBundleRange(String namespaceBundle) {
return namespaceBundle.substring(namespaceBundle.lastIndexOf('/') + 1);
}

public static String getBundleNamespace(String namespaceBundle) {
int index = namespaceBundle.lastIndexOf('/');
if (index != -1) {
try {
return NamespaceName.get(namespaceBundle.substring(0, index)).toString();
} catch (Exception e) {
// return itself if meets invalid format
}
}
return namespaceBundle;
}

public NamespaceBundleFactory getNamespaceBundleFactory() {
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN;
import static org.apache.pulsar.broker.namespace.NamespaceService.HEARTBEAT_NAMESPACE_PATTERN_V2;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
Expand Down Expand Up @@ -509,6 +512,14 @@ public void testRemoveOwnershipAndSplitBundle() throws Exception {
}
}

@Test
public void testHeartbeatNamespaceMatch() throws Exception {
NamespaceName namespaceName = NamespaceName.get(NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), conf));
NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getNamespaceBundleFactory().getFullBundle(namespaceName);
assertTrue(NamespaceService.isSystemServiceNamespace(
NamespaceBundle.getBundleNamespace(namespaceBundle.toString())));
}

@SuppressWarnings("unchecked")
private Pair<NamespaceBundles, List<NamespaceBundle>> splitBundles(NamespaceBundleFactory utilityFactory,
NamespaceName nsname, NamespaceBundles bundles, NamespaceBundle targetBundle) throws Exception {
Expand Down

0 comments on commit 6281523

Please sign in to comment.