Skip to content

Commit

Permalink
Fix time based backlog quota. (apache#11509)
Browse files Browse the repository at this point in the history
Fixes apache#11404

### Motivation
Time based backlog quota type message_age is set separately but when check backlog we are only checking against destination_storage type.
So fix to loop through all BacklogQuotaType when checking if backlog exceeded.

### Modification
* Added unit test.
* Added default implementation to make Admin Topic/Namespace backlog quota related API backward compatible.
  • Loading branch information
MarvinCai authored and ciaocloud committed Oct 16, 2021
1 parent d6492ed commit d9552f0
Show file tree
Hide file tree
Showing 22 changed files with 472 additions and 140 deletions.
Expand Up @@ -336,8 +336,10 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S

}

protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath) {
return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath);
protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath,
BacklogQuota.BacklogQuotaType backlogQuotaType) {
return pulsar().getBrokerService().getBacklogQuotaManager()
.getBacklogQuota(namespace, namespacePath, backlogQuotaType);
}

protected CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName) {
Expand Down
Expand Up @@ -2676,14 +2676,17 @@ protected CompletableFuture<Map<BacklogQuota.BacklogQuotaType, BacklogQuota>> in
quotaMap = getNamespacePolicies(namespaceName).backlog_quota_map;
if (quotaMap.isEmpty()) {
String namespace = namespaceName.toString();
quotaMap.put(
BacklogQuota.BacklogQuotaType.destination_storage,
namespaceBacklogQuota(namespace, AdminResource.path(POLICIES, namespace))
);
for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
quotaMap.put(
backlogQuotaType,
namespaceBacklogQuota(namespace,
AdminResource.path(POLICIES, namespace), backlogQuotaType)
);
}
}
}
return quotaMap;
});
});
}

protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
Expand Down Expand Up @@ -2799,21 +2802,21 @@ protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retenti
return getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
BacklogQuota backlogQuota =
topicPolicies.getBackLogQuotaMap()
.get(BacklogQuota.BacklogQuotaType.destination_storage.name());
if (backlogQuota == null) {
Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage);
}
if (!checkBacklogQuota(backlogQuota, retention)) {
log.warn(
"[{}] Failed to update retention quota configuration for topic {}: "
+ "conflicts with retention quota",
clientAppId(), topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Retention Quota must exceed configured backlog quota for topic. "
+ "Please increase retention quota and retry"));
for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
BacklogQuota backlogQuota = topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name());
if (backlogQuota == null) {
Policies policies = getNamespacePolicies(topicName.getNamespaceObject());
backlogQuota = policies.backlog_quota_map.get(backlogQuotaType);
}
if (!checkBacklogQuota(backlogQuota, retention)) {
log.warn(
"[{}] Failed to update retention quota configuration for topic {}: "
+ "conflicts with retention quota",
clientAppId(), topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Retention Quota must exceed configured backlog quota for topic. "
+ "Please increase retention quota and retry"));
}
}
topicPolicies.setRetentionPolicies(retention);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
Expand Down
Expand Up @@ -54,7 +54,8 @@ public class BacklogQuotaManager {
public BacklogQuotaManager(PulsarService pulsar) {
this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled();
this.defaultQuota = BacklogQuotaImpl.builder()
.limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024)
.limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB()
* BacklogQuotaImpl.BYTES_IN_GIGABYTE)
.limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond())
.retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy())
.build();
Expand All @@ -66,42 +67,42 @@ public BacklogQuotaImpl getDefaultQuota() {
return this.defaultQuota;
}

public BacklogQuotaImpl getBacklogQuota(String namespace, String policyPath) {
public BacklogQuotaImpl getBacklogQuota(String namespace, String policyPath, BacklogQuotaType backlogQuotaType) {
try {
return zkCache.get(policyPath)
.map(p -> (BacklogQuotaImpl) p.backlog_quota_map
.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota))
.getOrDefault(backlogQuotaType, defaultQuota))
.orElse(defaultQuota);
} catch (Exception e) {
log.warn("Failed to read policies data, will apply the default backlog quota: namespace={}", namespace, e);
return this.defaultQuota;
}
}

public BacklogQuotaImpl getBacklogQuota(TopicName topicName) {
public BacklogQuotaImpl getBacklogQuota(TopicName topicName, BacklogQuotaType backlogQuotaType) {
String policyPath = AdminResource.path(POLICIES, topicName.getNamespace());
if (!isTopicLevelPoliciesEnable) {
return getBacklogQuota(topicName.getNamespace(), policyPath);
return getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType);
}

try {
return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> map.get(BacklogQuotaType.destination_storage.name()))
.orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath));
.map(map -> map.get(backlogQuotaType.name()))
.orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType));
} catch (Exception e) {
log.warn("Failed to read topic policies data, will apply the namespace backlog quota: topicName={}",
topicName, e);
}
return getBacklogQuota(topicName.getNamespace(), policyPath);
return getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType);
}

public long getBacklogQuotaLimitInSize(TopicName topicName) {
return getBacklogQuota(topicName).getLimitSize();
return getBacklogQuota(topicName, BacklogQuotaType.destination_storage).getLimitSize();
}

public int getBacklogQuotaLimitInTime(TopicName topicName) {
return getBacklogQuota(topicName).getLimitTime();
return getBacklogQuota(topicName, BacklogQuotaType.message_age).getLimitTime();
}

/**
Expand All @@ -112,7 +113,7 @@ public int getBacklogQuotaLimitInTime(TopicName topicName) {
public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType,
boolean preciseTimeBasedBacklogQuotaCheck) {
TopicName topicName = TopicName.get(persistentTopic.getName());
BacklogQuota quota = getBacklogQuota(topicName);
BacklogQuota quota = getBacklogQuota(topicName, backlogQuotaType);
log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", backlogQuotaType,
persistentTopic.getName(), quota.getPolicy());
switch (quota.getPolicy()) {
Expand Down
Expand Up @@ -1194,23 +1194,27 @@ protected void handleProducer(final CommandProducer cmdProducer) {

service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> {
// Before creating producer, check if backlog quota exceeded
// on topic
if (topic.isBacklogQuotaExceeded(producerName)) {
IllegalStateException illegalStateException = new IllegalStateException(
"Cannot create producer on topic with backlog quota exceeded");
BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy();
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededError,
illegalStateException.getMessage());
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage());
// on topic for size based limit and time based limit
for (BacklogQuota.BacklogQuotaType backlogQuotaType :
BacklogQuota.BacklogQuotaType.values()) {
if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) {
IllegalStateException illegalStateException = new IllegalStateException(
"Cannot create producer on topic with backlog quota exceeded");
BacklogQuota.RetentionPolicy retentionPolicy = topic
.getBacklogQuota(backlogQuotaType).getPolicy();
if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededError,
illegalStateException.getMessage());
} else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
commandSender.sendErrorResponse(requestId,
ServerError.ProducerBlockedQuotaExceededException,
illegalStateException.getMessage());
}
producerFuture.completeExceptionally(illegalStateException);
producers.remove(producerId, producerFuture);
return;
}
producerFuture.completeExceptionally(illegalStateException);
producers.remove(producerId, producerFuture);
return;
}

// Check whether the producer will publish encrypted messages or not
Expand Down
Expand Up @@ -182,15 +182,15 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> onPoliciesUpdate(Policies data);

boolean isBacklogQuotaExceeded(String producerName);
boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType);

boolean isEncryptionRequired();

boolean getSchemaValidationEnforced();

boolean isReplicated();

BacklogQuota getBacklogQuota();
BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType);

void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats,
StatsOutputStream topicStatsStream, ClusterReplicationMetrics clusterReplicationMetrics,
Expand Down
Expand Up @@ -972,7 +972,7 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
* @return Backlog quota for topic
*/
@Override
public BacklogQuota getBacklogQuota() {
public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
// No-op
throw new UnsupportedOperationException("getBacklogQuota method is not supported on non-persistent topic");
}
Expand All @@ -982,7 +982,7 @@ public BacklogQuota getBacklogQuota() {
* @return quota exceeded status for blocking producer creation
*/
@Override
public boolean isBacklogQuotaExceeded(String producerName) {
public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
// No-op
return false;
}
Expand Down
Expand Up @@ -2321,27 +2321,29 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
* @return Backlog quota for topic
*/
@Override
public BacklogQuota getBacklogQuota() {
public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) {
TopicName topicName = TopicName.get(this.getName());
return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName);
return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName, backlogQuotaType);
}

/**
*
* @return quota exceeded status for blocking producer creation
*/
@Override
public boolean isBacklogQuotaExceeded(String producerName) {
BacklogQuota backlogQuota = getBacklogQuota();
public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) {
BacklogQuota backlogQuota = getBacklogQuota(backlogQuotaType);

if (backlogQuota != null) {
BacklogQuota.RetentionPolicy retentionPolicy = backlogQuota.getPolicy();

if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold
|| retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)
&& (isSizeBacklogExceeded() || isTimeBacklogExceeded())) {
log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName);
return true;
|| retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)) {
if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage && isSizeBacklogExceeded()
|| backlogQuotaType == BacklogQuota.BacklogQuotaType.message_age && isTimeBacklogExceeded()){
log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName);
return true;
}
} else {
return false;
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
Expand Down Expand Up @@ -99,8 +100,10 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include
stats.managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize();
stats.managedLedgerStats.backlogSize = ml.getEstimatedBacklogSize();
stats.managedLedgerStats.offloadedStorageUsed = ml.getOffloadedSize();
stats.backlogQuotaLimit = topic.getBacklogQuota().getLimitSize();
stats.backlogQuotaLimitTime = topic.getBacklogQuota().getLimitTime();
stats.backlogQuotaLimit = topic
.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
stats.backlogQuotaLimitTime = topic
.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();

stats.managedLedgerStats.storageWriteLatencyBuckets
.addAll(mlStats.getInternalAddEntryLatencyBuckets());
Expand Down
Expand Up @@ -18,25 +18,35 @@
*/
package org.apache.pulsar.broker;

import com.google.common.collect.ImmutableMap;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;

import java.util.Collections;
import java.util.Map;

public class ConfigHelper {
private ConfigHelper() {}


public static Map<BacklogQuota.BacklogQuotaType, BacklogQuota> backlogQuotaMap(ServiceConfiguration configuration) {
return Collections.singletonMap(BacklogQuota.BacklogQuotaType.destination_storage,
backlogQuota(configuration));
return ImmutableMap.of(BacklogQuota.BacklogQuotaType.destination_storage,
sizeBacklogQuota(configuration),
BacklogQuota.BacklogQuotaType.message_age,
timeBacklogQuota(configuration));
}

public static BacklogQuota backlogQuota(ServiceConfiguration configuration) {
public static BacklogQuota sizeBacklogQuota(ServiceConfiguration configuration) {
return BacklogQuota.builder()
.limitSize(configuration.getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024)
.limitSize(configuration.getBacklogQuotaDefaultLimitGB() * BacklogQuotaImpl.BYTES_IN_GIGABYTE)
.retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy())
.build();
}

public static BacklogQuota timeBacklogQuota(ServiceConfiguration configuration) {
return BacklogQuota.builder()
.limitTime(configuration.getBacklogQuotaDefaultLimitSecond())
.retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy())
.build();
}
Expand Down
Expand Up @@ -78,14 +78,14 @@ public void testBacklogQuotaDisabled() {
log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic);

try {
admin.topics().setBacklogQuota(testTopic, backlogQuota);
admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), HttpStatus.METHOD_NOT_ALLOWED_405);
}

try {
admin.topics().removeBacklogQuota(testTopic);
admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.destination_storage);
Assert.fail();
} catch (PulsarAdminException e) {
Assert.assertEquals(e.getStatusCode(), HttpStatus.METHOD_NOT_ALLOWED_405);
Expand Down

0 comments on commit d9552f0

Please sign in to comment.