From d9552f0e2b65858e470e815714637d241d70ed00 Mon Sep 17 00:00:00 2001 From: Marvin Cai Date: Thu, 5 Aug 2021 20:41:18 -0700 Subject: [PATCH] Fix time based backlog quota. (#11509) Fixes #11404 ### Motivation Time based backlog quota type message_age is set separately but when check backlog we are only checking against destination_storage type. So fix to loop through all BacklogQuotaType when checking if backlog exceeded. ### Modification * Added unit test. * Added default implementation to make Admin Topic/Namespace backlog quota related API backward compatible. --- .../pulsar/broker/admin/AdminResource.java | 6 +- .../admin/impl/PersistentTopicsBase.java | 43 ++-- .../broker/service/BacklogQuotaManager.java | 23 +- .../pulsar/broker/service/ServerCnx.java | 36 +-- .../apache/pulsar/broker/service/Topic.java | 4 +- .../nonpersistent/NonPersistentTopic.java | 4 +- .../service/persistent/PersistentTopic.java | 18 +- .../prometheus/NamespaceStatsAggregator.java | 7 +- .../apache/pulsar/broker/ConfigHelper.java | 20 +- .../admin/TopicPoliciesDisableTest.java | 4 +- .../broker/admin/TopicPoliciesTest.java | 230 ++++++++++++++++-- .../service/BacklogQuotaManagerTest.java | 18 +- .../pulsar/client/admin/Namespaces.java | 28 ++- .../apache/pulsar/client/admin/Topics.java | 19 +- .../policies/data/impl/BacklogQuotaImpl.java | 5 +- .../client/admin/internal/NamespacesImpl.java | 22 +- .../client/admin/internal/TopicsImpl.java | 10 +- .../pulsar/admin/cli/PulsarAdminToolTest.java | 39 ++- .../pulsar/admin/cli/CmdNamespaces.java | 11 +- .../apache/pulsar/admin/cli/CmdTopics.java | 15 +- site2/docs/cookbooks-retention-expiry.md | 12 +- site2/docs/reference-pulsar-admin.md | 38 +++ 22 files changed, 472 insertions(+), 140 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 5a453eb1af79a4..3cb38b35fe32e7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -336,8 +336,10 @@ protected void mergeNamespaceWithDefaults(Policies policies, String namespace, S } - protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath) { - return pulsar().getBrokerService().getBacklogQuotaManager().getBacklogQuota(namespace, namespacePath); + protected BacklogQuota namespaceBacklogQuota(String namespace, String namespacePath, + BacklogQuota.BacklogQuotaType backlogQuotaType) { + return pulsar().getBrokerService().getBacklogQuotaManager() + .getBacklogQuota(namespace, namespacePath, backlogQuotaType); } protected CompletableFuture> getTopicPoliciesAsyncWithRetry(TopicName topicName) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index f8c6ac2f6a0f0c..48319943c64585 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -2676,14 +2676,17 @@ protected CompletableFuture> in quotaMap = getNamespacePolicies(namespaceName).backlog_quota_map; if (quotaMap.isEmpty()) { String namespace = namespaceName.toString(); - quotaMap.put( - BacklogQuota.BacklogQuotaType.destination_storage, - namespaceBacklogQuota(namespace, AdminResource.path(POLICIES, namespace)) - ); + for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) { + quotaMap.put( + backlogQuotaType, + namespaceBacklogQuota(namespace, + AdminResource.path(POLICIES, namespace), backlogQuotaType) + ); + } } } return quotaMap; - }); + }); } protected CompletableFuture internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType, @@ -2799,21 +2802,21 @@ protected CompletableFuture internalSetRetention(RetentionPolicies retenti return getTopicPoliciesAsyncWithRetry(topicName) .thenCompose(op -> { TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new); - BacklogQuota backlogQuota = - topicPolicies.getBackLogQuotaMap() - .get(BacklogQuota.BacklogQuotaType.destination_storage.name()); - if (backlogQuota == null) { - Policies policies = getNamespacePolicies(topicName.getNamespaceObject()); - backlogQuota = policies.backlog_quota_map.get(BacklogQuota.BacklogQuotaType.destination_storage); - } - if (!checkBacklogQuota(backlogQuota, retention)) { - log.warn( - "[{}] Failed to update retention quota configuration for topic {}: " - + "conflicts with retention quota", - clientAppId(), topicName); - return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, - "Retention Quota must exceed configured backlog quota for topic. " - + "Please increase retention quota and retry")); + for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) { + BacklogQuota backlogQuota = topicPolicies.getBackLogQuotaMap().get(backlogQuotaType.name()); + if (backlogQuota == null) { + Policies policies = getNamespacePolicies(topicName.getNamespaceObject()); + backlogQuota = policies.backlog_quota_map.get(backlogQuotaType); + } + if (!checkBacklogQuota(backlogQuota, retention)) { + log.warn( + "[{}] Failed to update retention quota configuration for topic {}: " + + "conflicts with retention quota", + clientAppId(), topicName); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "Retention Quota must exceed configured backlog quota for topic. " + + "Please increase retention quota and retry")); + } } topicPolicies.setRetentionPolicies(retention); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index fa0d24b12f83bf..bbb3ddfcba660f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -54,7 +54,8 @@ public class BacklogQuotaManager { public BacklogQuotaManager(PulsarService pulsar) { this.isTopicLevelPoliciesEnable = pulsar.getConfiguration().isTopicLevelPoliciesEnabled(); this.defaultQuota = BacklogQuotaImpl.builder() - .limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024) + .limitSize(pulsar.getConfiguration().getBacklogQuotaDefaultLimitGB() + * BacklogQuotaImpl.BYTES_IN_GIGABYTE) .limitTime(pulsar.getConfiguration().getBacklogQuotaDefaultLimitSecond()) .retentionPolicy(pulsar.getConfiguration().getBacklogQuotaDefaultRetentionPolicy()) .build(); @@ -66,11 +67,11 @@ public BacklogQuotaImpl getDefaultQuota() { return this.defaultQuota; } - public BacklogQuotaImpl getBacklogQuota(String namespace, String policyPath) { + public BacklogQuotaImpl getBacklogQuota(String namespace, String policyPath, BacklogQuotaType backlogQuotaType) { try { return zkCache.get(policyPath) .map(p -> (BacklogQuotaImpl) p.backlog_quota_map - .getOrDefault(BacklogQuotaType.destination_storage, defaultQuota)) + .getOrDefault(backlogQuotaType, defaultQuota)) .orElse(defaultQuota); } catch (Exception e) { log.warn("Failed to read policies data, will apply the default backlog quota: namespace={}", namespace, e); @@ -78,30 +79,30 @@ public BacklogQuotaImpl getBacklogQuota(String namespace, String policyPath) { } } - public BacklogQuotaImpl getBacklogQuota(TopicName topicName) { + public BacklogQuotaImpl getBacklogQuota(TopicName topicName, BacklogQuotaType backlogQuotaType) { String policyPath = AdminResource.path(POLICIES, topicName.getNamespace()); if (!isTopicLevelPoliciesEnable) { - return getBacklogQuota(topicName.getNamespace(), policyPath); + return getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType); } try { return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName)) .map(TopicPolicies::getBackLogQuotaMap) - .map(map -> map.get(BacklogQuotaType.destination_storage.name())) - .orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath)); + .map(map -> map.get(backlogQuotaType.name())) + .orElseGet(() -> getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType)); } catch (Exception e) { log.warn("Failed to read topic policies data, will apply the namespace backlog quota: topicName={}", topicName, e); } - return getBacklogQuota(topicName.getNamespace(), policyPath); + return getBacklogQuota(topicName.getNamespace(), policyPath, backlogQuotaType); } public long getBacklogQuotaLimitInSize(TopicName topicName) { - return getBacklogQuota(topicName).getLimitSize(); + return getBacklogQuota(topicName, BacklogQuotaType.destination_storage).getLimitSize(); } public int getBacklogQuotaLimitInTime(TopicName topicName) { - return getBacklogQuota(topicName).getLimitTime(); + return getBacklogQuota(topicName, BacklogQuotaType.message_age).getLimitTime(); } /** @@ -112,7 +113,7 @@ public int getBacklogQuotaLimitInTime(TopicName topicName) { public void handleExceededBacklogQuota(PersistentTopic persistentTopic, BacklogQuotaType backlogQuotaType, boolean preciseTimeBasedBacklogQuotaCheck) { TopicName topicName = TopicName.get(persistentTopic.getName()); - BacklogQuota quota = getBacklogQuota(topicName); + BacklogQuota quota = getBacklogQuota(topicName, backlogQuotaType); log.info("Backlog quota type {} exceeded for topic [{}]. Applying [{}] policy", backlogQuotaType, persistentTopic.getName(), quota.getPolicy()); switch (quota.getPolicy()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 9599f5154dc743..375f74fbd5daaa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1194,23 +1194,27 @@ protected void handleProducer(final CommandProducer cmdProducer) { service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> { // Before creating producer, check if backlog quota exceeded - // on topic - if (topic.isBacklogQuotaExceeded(producerName)) { - IllegalStateException illegalStateException = new IllegalStateException( - "Cannot create producer on topic with backlog quota exceeded"); - BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota().getPolicy(); - if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) { - commandSender.sendErrorResponse(requestId, - ServerError.ProducerBlockedQuotaExceededError, - illegalStateException.getMessage()); - } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) { - commandSender.sendErrorResponse(requestId, - ServerError.ProducerBlockedQuotaExceededException, - illegalStateException.getMessage()); + // on topic for size based limit and time based limit + for (BacklogQuota.BacklogQuotaType backlogQuotaType : + BacklogQuota.BacklogQuotaType.values()) { + if (topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) { + IllegalStateException illegalStateException = new IllegalStateException( + "Cannot create producer on topic with backlog quota exceeded"); + BacklogQuota.RetentionPolicy retentionPolicy = topic + .getBacklogQuota(backlogQuotaType).getPolicy(); + if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) { + commandSender.sendErrorResponse(requestId, + ServerError.ProducerBlockedQuotaExceededError, + illegalStateException.getMessage()); + } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) { + commandSender.sendErrorResponse(requestId, + ServerError.ProducerBlockedQuotaExceededException, + illegalStateException.getMessage()); + } + producerFuture.completeExceptionally(illegalStateException); + producers.remove(producerId, producerFuture); + return; } - producerFuture.completeExceptionally(illegalStateException); - producers.remove(producerId, producerFuture); - return; } // Check whether the producer will publish encrypted messages or not diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 6c4d694c528099..308f28539c5856 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -182,7 +182,7 @@ CompletableFuture createSubscription(String subscriptionName, Init CompletableFuture onPoliciesUpdate(Policies data); - boolean isBacklogQuotaExceeded(String producerName); + boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType); boolean isEncryptionRequired(); @@ -190,7 +190,7 @@ CompletableFuture createSubscription(String subscriptionName, Init boolean isReplicated(); - BacklogQuota getBacklogQuota(); + BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType); void updateRates(NamespaceStats nsStats, NamespaceBundleStats currentBundleStats, StatsOutputStream topicStatsStream, ClusterReplicationMetrics clusterReplicationMetrics, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 7c13cdaf958fb6..e3255fe81b683b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -972,7 +972,7 @@ public CompletableFuture onPoliciesUpdate(Policies data) { * @return Backlog quota for topic */ @Override - public BacklogQuota getBacklogQuota() { + public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) { // No-op throw new UnsupportedOperationException("getBacklogQuota method is not supported on non-persistent topic"); } @@ -982,7 +982,7 @@ public BacklogQuota getBacklogQuota() { * @return quota exceeded status for blocking producer creation */ @Override - public boolean isBacklogQuotaExceeded(String producerName) { + public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) { // No-op return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 74d57d1d6f8fd2..db7998857c0900 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2321,9 +2321,9 @@ public CompletableFuture onPoliciesUpdate(Policies data) { * @return Backlog quota for topic */ @Override - public BacklogQuota getBacklogQuota() { + public BacklogQuota getBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType) { TopicName topicName = TopicName.get(this.getName()); - return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName); + return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName, backlogQuotaType); } /** @@ -2331,17 +2331,19 @@ public BacklogQuota getBacklogQuota() { * @return quota exceeded status for blocking producer creation */ @Override - public boolean isBacklogQuotaExceeded(String producerName) { - BacklogQuota backlogQuota = getBacklogQuota(); + public boolean isBacklogQuotaExceeded(String producerName, BacklogQuota.BacklogQuotaType backlogQuotaType) { + BacklogQuota backlogQuota = getBacklogQuota(backlogQuotaType); if (backlogQuota != null) { BacklogQuota.RetentionPolicy retentionPolicy = backlogQuota.getPolicy(); if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold - || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) - && (isSizeBacklogExceeded() || isTimeBacklogExceeded())) { - log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName); - return true; + || retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)) { + if (backlogQuotaType == BacklogQuota.BacklogQuotaType.destination_storage && isSizeBacklogExceeded() + || backlogQuotaType == BacklogQuota.BacklogQuotaType.message_age && isTimeBacklogExceeded()){ + log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName); + return true; + } } else { return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java index c08641d46a936d..e041a011fc7952 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/NamespaceStatsAggregator.java @@ -25,6 +25,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl; import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; @@ -99,8 +100,10 @@ private static void getTopicStats(Topic topic, TopicStats stats, boolean include stats.managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize(); stats.managedLedgerStats.backlogSize = ml.getEstimatedBacklogSize(); stats.managedLedgerStats.offloadedStorageUsed = ml.getOffloadedSize(); - stats.backlogQuotaLimit = topic.getBacklogQuota().getLimitSize(); - stats.backlogQuotaLimitTime = topic.getBacklogQuota().getLimitTime(); + stats.backlogQuotaLimit = topic + .getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize(); + stats.backlogQuotaLimitTime = topic + .getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime(); stats.managedLedgerStats.storageWriteLatencyBuckets .addAll(mlStats.getInternalAddEntryLatencyBuckets()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java index 86cf2aad711691..b929636486ca77 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/ConfigHelper.java @@ -18,11 +18,12 @@ */ package org.apache.pulsar.broker; +import com.google.common.collect.ImmutableMap; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.SubscribeRate; +import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; -import java.util.Collections; import java.util.Map; public class ConfigHelper { @@ -30,13 +31,22 @@ private ConfigHelper() {} public static Map backlogQuotaMap(ServiceConfiguration configuration) { - return Collections.singletonMap(BacklogQuota.BacklogQuotaType.destination_storage, - backlogQuota(configuration)); + return ImmutableMap.of(BacklogQuota.BacklogQuotaType.destination_storage, + sizeBacklogQuota(configuration), + BacklogQuota.BacklogQuotaType.message_age, + timeBacklogQuota(configuration)); } - public static BacklogQuota backlogQuota(ServiceConfiguration configuration) { + public static BacklogQuota sizeBacklogQuota(ServiceConfiguration configuration) { return BacklogQuota.builder() - .limitSize(configuration.getBacklogQuotaDefaultLimitGB() * 1024 * 1024 * 1024) + .limitSize(configuration.getBacklogQuotaDefaultLimitGB() * BacklogQuotaImpl.BYTES_IN_GIGABYTE) + .retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy()) + .build(); + } + + public static BacklogQuota timeBacklogQuota(ServiceConfiguration configuration) { + return BacklogQuota.builder() + .limitTime(configuration.getBacklogQuotaDefaultLimitSecond()) .retentionPolicy(configuration.getBacklogQuotaDefaultRetentionPolicy()) .build(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java index 5e9967bfe7988c..e9ec1905ee02e3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesDisableTest.java @@ -78,14 +78,14 @@ public void testBacklogQuotaDisabled() { log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); try { - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); Assert.fail(); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), HttpStatus.METHOD_NOT_ALLOWED_405); } try { - admin.topics().removeBacklogQuota(testTopic); + admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.destination_storage); Assert.fail(); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), HttpStatus.METHOD_NOT_ALLOWED_405); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 6a86354c9b29e9..d55f011af1e74d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -122,7 +122,7 @@ public void cleanup() throws Exception { } @Test - public void testSetBacklogQuota() throws Exception { + public void testSetSizeBasedBacklogQuota() throws Exception { BacklogQuota backlogQuota = BacklogQuota.builder() .limitSize(1024) @@ -130,7 +130,7 @@ public void testSetBacklogQuota() throws Exception { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota set success on topic: {}", testTopic); Awaitility.await() @@ -138,7 +138,8 @@ public void testSetBacklogQuota() throws Exception { .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager(); - BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic)); + BacklogQuota backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic); Assert.assertEquals(backlogQuota, backlogQuotaInManager); @@ -146,14 +147,37 @@ public void testSetBacklogQuota() throws Exception { } @Test - public void testRemoveBacklogQuota() throws Exception { + public void testSetTimeBasedBacklogQuota() throws Exception { + + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitTime(1000) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota)); + + BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager(); + BacklogQuota backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age); + + Assert.assertEquals(backlogQuota, backlogQuotaInManager); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + + @Test + public void testRemoveSizeBasedBacklogQuota() throws Exception { BacklogQuota backlogQuota = BacklogQuota.builder() .limitSize(1024) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota set success on topic: {}", testTopic); Awaitility.await() @@ -161,16 +185,18 @@ public void testRemoveBacklogQuota() throws Exception { .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager(); - BacklogQuota backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic)); + BacklogQuota backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota {} in backlog quota manager on topic: {}", backlogQuotaInManager, testTopic); Assert.assertEquals(backlogQuota, backlogQuotaInManager); - admin.topics().removeBacklogQuota(testTopic); + admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.destination_storage); Awaitility.await() .untilAsserted(() -> Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic) .get(BacklogQuota.BacklogQuotaType.destination_storage))); - backlogQuotaInManager = backlogQuotaManager.getBacklogQuota(TopicName.get(testTopic)); + backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota {} in backlog quota manager on topic: {} after remove", backlogQuotaInManager, testTopic); Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager); @@ -179,7 +205,37 @@ public void testRemoveBacklogQuota() throws Exception { } @Test - public void testCheckBacklogQuota() throws Exception { + public void testRemoveTimeBasedBacklogQuota() throws Exception { + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitTime(1000) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota)); + + BacklogQuotaManager backlogQuotaManager = pulsar.getBrokerService().getBacklogQuotaManager(); + BacklogQuota backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age); + Assert.assertEquals(backlogQuota, backlogQuotaInManager); + + admin.topics().removeBacklogQuota(testTopic, BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await() + .untilAsserted(() -> Assert.assertNull(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.message_age))); + + backlogQuotaInManager = backlogQuotaManager + .getBacklogQuota(TopicName.get(testTopic), BacklogQuota.BacklogQuotaType.message_age); + Assert.assertEquals(backlogQuotaManager.getDefaultQuota(), backlogQuotaInManager); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + + @Test + public void testCheckSizeBasedBacklogQuota() throws Exception { RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10); String namespace = TopicName.get(testTopic).getNamespace(); admin.namespaces().setRetention(namespace, retentionPolicies); @@ -193,7 +249,7 @@ public void testCheckBacklogQuota() throws Exception { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); try { - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); Assert.fail(); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), 412); @@ -205,7 +261,7 @@ public void testCheckBacklogQuota() throws Exception { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); try { - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); Assert.fail(); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), 412); @@ -216,7 +272,7 @@ public void testCheckBacklogQuota() throws Exception { .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); BacklogQuota finalBacklogQuota = backlogQuota; Awaitility.await() @@ -226,8 +282,56 @@ public void testCheckBacklogQuota() throws Exception { admin.topics().deletePartitionedTopic(testTopic, true); } + @Test + public void testCheckTimeBasedBacklogQuota() throws Exception { + RetentionPolicies retentionPolicies = new RetentionPolicies(10, 10); + String namespace = TopicName.get(testTopic).getNamespace(); + admin.namespaces().setRetention(namespace, retentionPolicies); + + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getRetention(namespace), retentionPolicies)); + + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitTime(10 * 60) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); + try { + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 412); + } + + backlogQuota = BacklogQuota.builder() + .limitTime(10 * 60 + 1) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); + try { + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 412); + } + + backlogQuota = BacklogQuota.builder() + .limitTime(10 * 60 - 1) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + + BacklogQuota finalBacklogQuota = backlogQuota; + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.message_age), finalBacklogQuota)); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + @Test(timeOut = 20000) - public void testGetBacklogQuotaApplied() throws Exception { + public void testGetSizeBasedBacklogQuotaApplied() throws Exception { final String topic = testTopic + UUID.randomUUID(); pulsarClient.newProducer().topic(topic).create().close(); assertEquals(admin.topics().getBacklogQuotaMap(topic), Maps.newHashMap()); @@ -236,6 +340,7 @@ public void testGetBacklogQuotaApplied() throws Exception { assertEquals(admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap); BacklogQuota namespaceQuota = BacklogQuota.builder() .limitSize(30) + .limitTime(10) .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) .build(); @@ -243,23 +348,67 @@ public void testGetBacklogQuotaApplied() throws Exception { Awaitility.await().untilAsserted(() -> assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty())); Map namespaceQuotaMap = Maps.newHashMap(); namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, namespaceQuota); + namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, BacklogQuota.builder() + .retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build()); assertEquals(admin.topics().getBacklogQuotaMap(topic, true), namespaceQuotaMap); BacklogQuota topicQuota = BacklogQuota.builder() .limitSize(40) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); - admin.topics().setBacklogQuota(topic, topicQuota); + admin.topics().setBacklogQuota(topic, topicQuota, BacklogQuota.BacklogQuotaType.destination_storage); Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getBacklogQuotaMap(topic).isEmpty())); Map topicQuotaMap = Maps.newHashMap(); topicQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, topicQuota); assertEquals(admin.topics().getBacklogQuotaMap(topic, true), topicQuotaMap); admin.namespaces().removeBacklogQuota(myNamespace); - admin.topics().removeBacklogQuota(topic); - Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty())); + admin.topics().removeBacklogQuota(topic, BacklogQuota.BacklogQuotaType.destination_storage); + Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace) + .get(BacklogQuota.BacklogQuotaType.destination_storage) == null)); Awaitility.await().untilAsserted(() -> assertTrue(admin.topics().getBacklogQuotaMap(topic).isEmpty())); + assertTrue(admin.topics().getBacklogQuotaMap(topic, true) + .get(BacklogQuota.BacklogQuotaType.destination_storage) == null); + } + + @Test(timeOut = 20000) + public void testGetTimeBasedBacklogQuotaApplied() throws Exception { + final String topic = testTopic + UUID.randomUUID(); + pulsarClient.newProducer().topic(topic).create().close(); + assertEquals(admin.topics().getBacklogQuotaMap(topic), Maps.newHashMap()); + assertEquals(admin.namespaces().getBacklogQuotaMap(myNamespace), Maps.newHashMap()); + Map brokerQuotaMap = ConfigHelper.backlogQuotaMap(conf); assertEquals(admin.topics().getBacklogQuotaMap(topic, true), brokerQuotaMap); + BacklogQuota namespaceQuota = BacklogQuota.builder() + .limitTime(30) + .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) + .build(); + + admin.namespaces().setBacklogQuota(myNamespace, namespaceQuota, BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await().untilAsserted(() -> assertFalse(admin.namespaces().getBacklogQuotaMap(myNamespace).isEmpty())); + Map namespaceQuotaMap = Maps.newHashMap(); + namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, namespaceQuota); + namespaceQuotaMap.put(BacklogQuota.BacklogQuotaType.destination_storage, BacklogQuota.builder() + .retentionPolicy(BacklogQuota.RetentionPolicy.producer_request_hold).build()); + assertEquals(admin.topics().getBacklogQuotaMap(topic, true), namespaceQuotaMap); + + BacklogQuota topicQuota = BacklogQuota.builder() + .limitTime(40) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + admin.topics().setBacklogQuota(topic, topicQuota, BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await().untilAsserted(() -> assertFalse(admin.topics().getBacklogQuotaMap(topic).isEmpty())); + Map topicQuotaMap = Maps.newHashMap(); + topicQuotaMap.put(BacklogQuota.BacklogQuotaType.message_age, topicQuota); + assertEquals(admin.topics().getBacklogQuotaMap(topic, true), topicQuotaMap); + + admin.namespaces().removeBacklogQuota(myNamespace, BacklogQuota.BacklogQuotaType.message_age); + admin.topics().removeBacklogQuota(topic, BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await().untilAsserted(() -> assertTrue(admin.namespaces().getBacklogQuotaMap(myNamespace) + .get(BacklogQuota.BacklogQuotaType.message_age) == null)); + Awaitility.await().untilAsserted(() -> assertTrue(admin.topics().getBacklogQuotaMap(topic).isEmpty())); + assertTrue(admin.topics().getBacklogQuotaMap(topic, true) + .get(BacklogQuota.BacklogQuotaType.message_age) == null); } @Test @@ -276,7 +425,7 @@ public void testCheckBacklogQuotaFailed() throws Exception { .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); try { - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); Assert.fail(); } catch (PulsarAdminException e) { Assert.assertEquals(e.getStatusCode(), 412); @@ -288,13 +437,13 @@ public void testCheckBacklogQuotaFailed() throws Exception { } @Test - public void testCheckRetention() throws Exception { + public void testCheckRetentionSizeBasedQuota() throws Exception { BacklogQuota backlogQuota = BacklogQuota.builder() .limitSize(10 * 1024 * 1024) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) .build(); - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); Awaitility.await() .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) .get(BacklogQuota.BacklogQuotaType.destination_storage), backlogQuota)); @@ -328,6 +477,47 @@ public void testCheckRetention() throws Exception { admin.topics().deletePartitionedTopic(testTopic, true); } + @Test + public void testCheckRetentionTimeBasedQuota() throws Exception { + BacklogQuota backlogQuota = BacklogQuota.builder() + .limitTime(10 * 60) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(); + + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.message_age); + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getBacklogQuotaMap(testTopic) + .get(BacklogQuota.BacklogQuotaType.message_age), backlogQuota)); + + RetentionPolicies retention = new RetentionPolicies(10, 10); + log.info("Retention: {} will set to the topic: {}", retention, testTopic); + try { + admin.topics().setRetention(testTopic, retention); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 412); + } + + retention = new RetentionPolicies(9, 10); + log.info("Retention: {} will set to the topic: {}", retention, testTopic); + try { + admin.topics().setRetention(testTopic, retention); + Assert.fail(); + } catch (PulsarAdminException e) { + Assert.assertEquals(e.getStatusCode(), 412); + } + + retention = new RetentionPolicies(12, 10); + log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); + admin.topics().setRetention(testTopic, retention); + + RetentionPolicies finalRetention = retention; + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(admin.topics().getRetention(testTopic), finalRetention)); + + admin.topics().deletePartitionedTopic(testTopic, true); + } + @Test public void testSetRetention() throws Exception { RetentionPolicies retention = new RetentionPolicies(60, 1024); @@ -2214,7 +2404,7 @@ public void testSystemTopicShouldBeCompacted() throws Exception { .build(); log.info("Backlog quota: {} will set to the topic: {}", backlogQuota, testTopic); - admin.topics().setBacklogQuota(testTopic, backlogQuota); + admin.topics().setBacklogQuota(testTopic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); log.info("Backlog quota set success on topic: {}", testTopic); Awaitility.await() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 552ba5a7a0b8ae..560cd9e0f552c8 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -398,10 +398,9 @@ public void testConsumerBacklogEvictionTimeQuotaPrecise() throws Exception { Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder() - .limitSize(20 * 1024) .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); config.setPreciseTimeBasedBacklogQuotaCheck(true); PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -441,10 +440,9 @@ public void testConsumerBacklogEvictionTimeQuota() throws Exception { Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder() - .limitSize(20 * 1024) .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) .build(); @@ -519,10 +517,9 @@ public void testConsumerBacklogEvictionWithAckTimeQuotaPrecise() throws Exceptio Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder() - .limitSize(10 * 1024) .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); config.setPreciseTimeBasedBacklogQuotaCheck(true); PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); @@ -583,10 +580,9 @@ public void testConsumerBacklogEvictionWithAckTimeQuota() throws Exception { Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/ns-quota", BacklogQuota.builder() - .limitSize(20 * 1024) .limitTime(2 * TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).build(); @@ -1090,10 +1086,9 @@ public void testProducerExceptionAndThenUnblockTimeQuotaPrecise() throws Excepti Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder() - .limitSize(10 * 1024) .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); config.setPreciseTimeBasedBacklogQuotaCheck(true); final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) .statsInterval(0, TimeUnit.SECONDS).build(); @@ -1157,10 +1152,9 @@ public void testProducerExceptionAndThenUnblockTimeQuota() throws Exception { Maps.newHashMap()); admin.namespaces().setBacklogQuota("prop/quotahold", BacklogQuota.builder() - .limitSize(15 * 1024) .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) .retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception) - .build()); + .build(), BacklogQuota.BacklogQuotaType.message_age); final PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()) .statsInterval(0, TimeUnit.SECONDS).build(); final String topic1 = "persistent://prop/quotahold/exceptandunblock2"; diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java index 6e66d3bc935167..f6f8654f2d4a7f 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java @@ -1529,7 +1529,12 @@ CompletableFuture setSubscriptionTypesEnabledAsync(String namespace, * @throws PulsarAdminException * Unexpected error */ - void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws PulsarAdminException; + void setBacklogQuota(String namespace, BacklogQuota backlogQuota, BacklogQuota.BacklogQuotaType backlogQuotaType) + throws PulsarAdminException; + + default void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws PulsarAdminException { + setBacklogQuota(namespace, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); + } /** * Set a backlog quota for all the topics on a namespace asynchronously. @@ -1554,7 +1559,12 @@ CompletableFuture setSubscriptionTypesEnabledAsync(String namespace, * @param backlogQuota * the new BacklogQuota */ - CompletableFuture setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota); + CompletableFuture setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota, + BacklogQuota.BacklogQuotaType backlogQuotaType); + + default CompletableFuture setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota) { + return setBacklogQuotaAsync(namespace, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); + } /** * Remove a backlog quota policy from a namespace. @@ -1573,7 +1583,12 @@ CompletableFuture setSubscriptionTypesEnabledAsync(String namespace, * @throws PulsarAdminException * Unexpected error */ - void removeBacklogQuota(String namespace) throws PulsarAdminException; + void removeBacklogQuota(String namespace, BacklogQuota.BacklogQuotaType backlogQuotaType) + throws PulsarAdminException; + + default void removeBacklogQuota(String namespace) throws PulsarAdminException { + removeBacklogQuota(namespace, BacklogQuota.BacklogQuotaType.destination_storage); + } /** * Remove a backlog quota policy from a namespace asynchronously. @@ -1585,7 +1600,12 @@ CompletableFuture setSubscriptionTypesEnabledAsync(String namespace, * @param namespace * Namespace name */ - CompletableFuture removeBacklogQuotaAsync(String namespace); + CompletableFuture removeBacklogQuotaAsync(String namespace, BacklogQuota.BacklogQuotaType backlogQuotaType); + + default CompletableFuture removeBacklogQuotaAsync(String namespace) { + return removeBacklogQuotaAsync(namespace, BacklogQuota.BacklogQuotaType.destination_storage); + } + /** * Remove the persistence configuration on a namespace. diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 2e465eb2f2e225..0d041df159aad8 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -1606,7 +1606,8 @@ void createSubscription(String topic, String subscriptionName, MessageId message * @throws PulsarAdminException * Unexpected error */ - Map getBacklogQuotaMap(String topic) throws PulsarAdminException; + Map getBacklogQuotaMap(String topic) + throws PulsarAdminException; /** * Get applied backlog quota map for a topic. @@ -1639,6 +1640,7 @@ Map getBacklogQuotaMap(String topic * Topic name * @param backlogQuota * the new BacklogQuota + * @param backlogQuotaType * * @throws NotAuthorizedException * Don't have admin permission @@ -1647,7 +1649,12 @@ Map getBacklogQuotaMap(String topic * @throws PulsarAdminException * Unexpected error */ - void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException; + void setBacklogQuota(String topic, BacklogQuota backlogQuota, + BacklogQuota.BacklogQuotaType backlogQuotaType) throws PulsarAdminException; + + default void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException { + setBacklogQuota(topic, backlogQuota, BacklogQuota.BacklogQuotaType.destination_storage); + } /** * Remove a backlog quota policy from a topic. @@ -1655,6 +1662,7 @@ Map getBacklogQuotaMap(String topic * * @param topic * Topic name + * @param backlogQuotaType * * @throws NotAuthorizedException * Don't have admin permission @@ -1663,7 +1671,12 @@ Map getBacklogQuotaMap(String topic * @throws PulsarAdminException * Unexpected error */ - void removeBacklogQuota(String topic) throws PulsarAdminException; + void removeBacklogQuota(String topic, BacklogQuota.BacklogQuotaType backlogQuotaType) throws PulsarAdminException; + + default void removeBacklogQuota(String topic) + throws PulsarAdminException { + removeBacklogQuota(topic, BacklogQuota.BacklogQuotaType.destination_storage); + } /** * Get the delayed delivery policy applied for a specified topic. diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java index f60f4ca10de6ab..79e0af27f8a6fc 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/impl/BacklogQuotaImpl.java @@ -27,6 +27,9 @@ @AllArgsConstructor @NoArgsConstructor public class BacklogQuotaImpl implements BacklogQuota { + public static final long BYTES_IN_GIGABYTE = 1024 * 1024 * 1024; + + // backlog quota by size in byte private long limitSize; // backlog quota by time in second private int limitTime; @@ -37,7 +40,7 @@ public static BacklogQuotaImplBuilder builder() { } public static class BacklogQuotaImplBuilder implements BacklogQuota.Builder { - private long limitSize; + private long limitSize = -1 * BYTES_IN_GIGABYTE; private int limitTime = -1; private RetentionPolicy retentionPolicy; diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java index e51414a6b98b57..8307a5f1725830 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java @@ -1154,9 +1154,11 @@ public void failed(Throwable throwable) { } @Override - public void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws PulsarAdminException { + public void setBacklogQuota(String namespace, BacklogQuota backlogQuota, + BacklogQuota.BacklogQuotaType backlogQuotaType) throws PulsarAdminException { try { - setBacklogQuotaAsync(namespace, backlogQuota).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + setBacklogQuotaAsync(namespace, backlogQuota, backlogQuotaType) + .get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); } catch (InterruptedException e) { @@ -1168,16 +1170,19 @@ public void setBacklogQuota(String namespace, BacklogQuota backlogQuota) throws } @Override - public CompletableFuture setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota) { + public CompletableFuture setBacklogQuotaAsync(String namespace, BacklogQuota backlogQuota, + BacklogQuota.BacklogQuotaType backlogQuotaType) { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, "backlogQuota"); - return asyncPostRequest(path, Entity.entity(backlogQuota, MediaType.APPLICATION_JSON)); + return asyncPostRequest(path.queryParam("backlogQuotaType", backlogQuotaType.toString()), + Entity.entity(backlogQuota, MediaType.APPLICATION_JSON)); } @Override - public void removeBacklogQuota(String namespace) throws PulsarAdminException { + public void removeBacklogQuota(String namespace, BacklogQuota.BacklogQuotaType backlogQuotaType) + throws PulsarAdminException { try { - removeBacklogQuotaAsync(namespace). + removeBacklogQuotaAsync(namespace, backlogQuotaType). get(this.readTimeoutMs, TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw (PulsarAdminException) e.getCause(); @@ -1212,10 +1217,11 @@ public CompletableFuture removeInactiveTopicPoliciesAsync(String namespace } @Override - public CompletableFuture removeBacklogQuotaAsync(String namespace) { + public CompletableFuture removeBacklogQuotaAsync(String namespace, + BacklogQuota.BacklogQuotaType backlogQuotaType) { NamespaceName ns = NamespaceName.get(namespace); WebTarget path = namespacePath(ns, "backlogQuota") - .queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString()); + .queryParam("backlogQuotaType", backlogQuotaType.toString()); return asyncDeleteRequest(path); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index b5af8968be422b..bfbbe9f53267cf 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1805,22 +1805,24 @@ public Map getBacklogQuotaMap(String topic, bool } @Override - public void setBacklogQuota(String topic, BacklogQuota backlogQuota) throws PulsarAdminException { + public void setBacklogQuota(String topic, BacklogQuota backlogQuota, + BacklogQuotaType backlogQuotaType) throws PulsarAdminException { try { TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "backlogQuota"); - request(path).post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class); + request(path.queryParam("backlogQuotaType", backlogQuotaType.toString())) + .post(Entity.entity(backlogQuota, MediaType.APPLICATION_JSON), ErrorData.class); } catch (Exception e) { throw getApiException(e); } } @Override - public void removeBacklogQuota(String topic) throws PulsarAdminException { + public void removeBacklogQuota(String topic, BacklogQuotaType backlogQuotaType) throws PulsarAdminException { try { TopicName tn = validateTopic(topic); WebTarget path = topicPath(tn, "backlogQuota"); - request(path.queryParam("backlogQuotaType", BacklogQuotaType.destination_storage.toString())) + request(path.queryParam("backlogQuotaType", backlogQuotaType.toString())) .delete(ErrorData.class); } catch (Exception e) { throw getApiException(e); diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index a672c6c14765bf..2db9bc0ac9c997 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -415,7 +415,8 @@ public void namespaces() throws Exception { BacklogQuota.builder() .limitSize(10) .retentionPolicy(RetentionPolicy.producer_request_hold) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); mockNamespaces = mock(Namespaces.class); when(admin.namespaces()).thenReturn(mockNamespaces); @@ -426,7 +427,8 @@ public void namespaces() throws Exception { BacklogQuota.builder() .limitSize(10 * 1024) .retentionPolicy(RetentionPolicy.producer_exception) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); mockNamespaces = mock(Namespaces.class); when(admin.namespaces()).thenReturn(mockNamespaces); @@ -437,7 +439,8 @@ public void namespaces() throws Exception { BacklogQuota.builder() .limitSize(10 * 1024 * 1024) .retentionPolicy(RetentionPolicy.producer_exception) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); mockNamespaces = mock(Namespaces.class); when(admin.namespaces()).thenReturn(mockNamespaces); @@ -448,19 +451,21 @@ public void namespaces() throws Exception { BacklogQuota.builder() .limitSize(10L * 1024 * 1024 * 1024) .retentionPolicy(RetentionPolicy.producer_exception) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); mockNamespaces = mock(Namespaces.class); when(admin.namespaces()).thenReturn(mockNamespaces); namespaces = new CmdNamespaces(() -> admin); - namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -lt 10000")); + namespaces.run(split("set-backlog-quota myprop/clust/ns1 -p producer_exception -l 10G -lt 10000 -t message_age")); verify(mockNamespaces).setBacklogQuota("myprop/clust/ns1", BacklogQuota.builder() .limitSize(10l * 1024 * 1024 * 1024) .limitTime(10000) .retentionPolicy(RetentionPolicy.producer_exception) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.message_age); namespaces.run(split("set-persistence myprop/clust/ns1 -e 2 -w 1 -a 1 -r 100.0")); verify(mockNamespaces).setPersistence("myprop/clust/ns1", @@ -902,15 +907,31 @@ public void topics() throws Exception { cmdTopics.run(split("get-backlog-quotas persistent://myprop/clust/ns1/ds1 -ap")); verify(mockTopics).getBacklogQuotaMap("persistent://myprop/clust/ns1/ds1", true); - cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 10 -lt 1000 -p producer_request_hold")); + cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -l 10 -p producer_request_hold")); verify(mockTopics).setBacklogQuota("persistent://myprop/clust/ns1/ds1", BacklogQuota.builder() .limitSize(10) + .retentionPolicy(RetentionPolicy.producer_request_hold) + .build(), + BacklogQuota.BacklogQuotaType.destination_storage); + //cmd with option cannot be executed repeatedly. + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("set-backlog-quota persistent://myprop/clust/ns1/ds1 -lt 1000 -p producer_request_hold -t message_age")); + verify(mockTopics).setBacklogQuota("persistent://myprop/clust/ns1/ds1", + BacklogQuota.builder() + .limitSize(-1) .limitTime(1000) .retentionPolicy(RetentionPolicy.producer_request_hold) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.message_age); + //cmd with option cannot be executed repeatedly. + cmdTopics = new CmdTopics(() -> admin); cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1")); - verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1"); + verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1", BacklogQuota.BacklogQuotaType.destination_storage); + //cmd with option cannot be executed repeatedly. + cmdTopics = new CmdTopics(() -> admin); + cmdTopics.run(split("remove-backlog-quota persistent://myprop/clust/ns1/ds1 -t message_age")); + verify(mockTopics).removeBacklogQuota("persistent://myprop/clust/ns1/ds1", BacklogQuota.BacklogQuotaType.message_age); cmdTopics.run(split("info-internal persistent://myprop/clust/ns1/ds1")); verify(mockTopics).getInternalInfo("persistent://myprop/clust/ns1/ds1"); diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java index a44022e50d1790..bac6a39d3802ad 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java @@ -1095,6 +1095,9 @@ private class SetBacklogQuota extends CliCommand { + "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]", required = true) private String policyStr; + @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set") + private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name(); + @Override void run() throws PulsarAdminException { BacklogQuota.RetentionPolicy policy; @@ -1114,7 +1117,8 @@ void run() throws PulsarAdminException { BacklogQuota.builder().limitSize(limit) .limitTime(limitTime) .retentionPolicy(policy) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType)); } } @@ -1123,10 +1127,13 @@ private class RemoveBacklogQuota extends CliCommand { @Parameter(description = "tenant/namespace", required = true) private java.util.List params; + @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to remove") + private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name(); + @Override void run() throws PulsarAdminException { String namespace = validateNamespace(params); - getAdmin().namespaces().removeBacklogQuota(namespace); + getAdmin().namespaces().removeBacklogQuota(namespace, BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType)); } } diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java index 9faef0ecfe7121..abd70ace1ddba4 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java @@ -1186,8 +1186,8 @@ private class SetBacklogQuota extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; - @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)", required = true) - private String limitStr; + @Parameter(names = { "-l", "--limit" }, description = "Size limit (eg: 10M, 16G)") + private String limitStr = "-1"; @Parameter(names = { "-lt", "--limitTime" }, description = "Time limit in second, non-positive number for disabling time limit.") private int limitTime = -1; @@ -1196,6 +1196,9 @@ private class SetBacklogQuota extends CliCommand { + "Valid options are: [producer_request_hold, producer_exception, consumer_backlog_eviction]", required = true) private String policyStr; + @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to set") + private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name(); + @Override void run() throws PulsarAdminException { BacklogQuota.RetentionPolicy policy; @@ -1215,7 +1218,8 @@ void run() throws PulsarAdminException { .limitSize(limit) .limitTime(limitTime) .retentionPolicy(policy) - .build()); + .build(), + BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType)); } } @@ -1225,10 +1229,13 @@ private class RemoveBacklogQuota extends CliCommand { @Parameter(description = "persistent://tenant/namespace/topic", required = true) private java.util.List params; + @Parameter(names = {"-t", "--type"}, description = "Backlog quota type to remove") + private String backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage.name(); + @Override void run() throws PulsarAdminException { String persistentTopic = validatePersistentTopic(params); - getTopics().removeBacklogQuota(persistentTopic); + getTopics().removeBacklogQuota(persistentTopic, BacklogQuota.BacklogQuotaType.valueOf(backlogQuotaType)); } } diff --git a/site2/docs/cookbooks-retention-expiry.md b/site2/docs/cookbooks-retention-expiry.md index 78635bac9ace05..94d2b83ba478fd 100644 --- a/site2/docs/cookbooks-retention-expiry.md +++ b/site2/docs/cookbooks-retention-expiry.md @@ -179,17 +179,23 @@ You can set a size and/or time threshold and backlog retention policy for all of #### pulsar-admin -Use the [`set-backlog-quota`](reference-pulsar-admin.md#namespaces) subcommand and specify a namespace, a size limit using the `-l`/`--limit` flag, and a retention policy using the `-p`/`--policy` flag. +Use the [`set-backlog-quota`](reference-pulsar-admin.md#namespaces) subcommand and specify a namespace, a size limit using the `-l`/`--limit` , `-lt`/`--limitTime` flag to limit backlog, a retention policy using the `-p`/`--policy` flag and a policy type using `-t`/`--type` (default is destination_storage). ##### Example ```shell $ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \ --limit 2G \ - --limitTime 36000 \ --policy producer_request_hold ``` +```shell +$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns/my-topic \ +--limitTime 3600 \ +--policy producer_request_hold \ +--type message_age +``` + #### REST API {@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/getBacklogQuotaMap?version=[[pulsar:version_number]]} @@ -236,7 +242,7 @@ Map quotas = #### pulsar-admin -Use the [`remove-backlog-quota`](reference-pulsar-admin.md#pulsar-admin-namespaces-remove-backlog-quota) subcommand and specify a namespace. Here's an example: +Use the [`remove-backlog-quota`](reference-pulsar-admin.md#pulsar-admin-namespaces-remove-backlog-quota) subcommand and specify a namespace, use `t`/`--type` to specify backlog type to remove(default is destination_storage). Here's an example: ```shell $ pulsar-admin namespaces remove-backlog-quota my-tenant/my-ns diff --git a/site2/docs/reference-pulsar-admin.md b/site2/docs/reference-pulsar-admin.md index 85061895ff8d88..fbfdc9e2135623 100644 --- a/site2/docs/reference-pulsar-admin.md +++ b/site2/docs/reference-pulsar-admin.md @@ -1124,7 +1124,9 @@ Options |Flag|Description|Default| |----|---|---| |`-l`, `--limit`|The backlog size limit (for example `10M` or `16G`)|| +|`-lt`, `--limitTime`|Time limit in second, non-positive number for disabling time limit. (for example 3600 for 1 hour)|| |`-p`, `--policy`|The retention policy to enforce when the limit is reached. The valid options are: `producer_request_hold`, `producer_exception` or `consumer_backlog_eviction`| +|`-t`, `--type`|Backlog quota type to set. The valid options are: `destination_storage`, `message_age` |destination_storage| Example ```bash @@ -1133,9 +1135,20 @@ $ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \ --policy producer_request_hold ``` +```bash +$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns \ +--limitTime 3600 \ +--policy producer_request_hold \ +--type message_age +``` + ### `remove-backlog-quota` Remove a backlog quota policy from a namespace +|Flag|Description|Default| +|---|---|---| +|`-t`, `--type`|Backlog quota type to remove. The valid options are: `destination_storage`, `message_age` |destination_storage| + Usage ```bash $ pulsar-admin namespaces remove-backlog-quota tenant/namespace @@ -2351,14 +2364,39 @@ $ pulsar-admin topics get-backlog-quotas tenant/namespace/topic ### `set-backlog-quota` Set a backlog quota policy for a topic. +|Flag|Description|Default| +|----|---|---| +|`-l`, `--limit`|The backlog size limit (for example `10M` or `16G`)|| +|`-lt`, `--limitTime`|Time limit in second, non-positive number for disabling time limit. (for example 3600 for 1 hour)|| +|`-p`, `--policy`|The retention policy to enforce when the limit is reached. The valid options are: `producer_request_hold`, `producer_exception` or `consumer_backlog_eviction`| +|`-t`, `--type`|Backlog quota type to set. The valid options are: `destination_storage`, `message_age` |destination_storage| + Usage ```bash $ pulsar-admin topics set-backlog-quota tenant/namespace/topic options ``` +Example +```bash +$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns/my-topic \ +--limit 2G \ +--policy producer_request_hold +``` + +```bash +$ pulsar-admin namespaces set-backlog-quota my-tenant/my-ns/my-topic \ +--limitTime 3600 \ +--policy producer_request_hold \ +--type message_age +``` + ### `remove-backlog-quota` Remove a backlog quota policy from a topic. +|Flag|Description|Default| +|---|---|---| +|`-t`, `--type`|Backlog quota type to remove. The valid options are: `destination_storage`, `message_age` |destination_storage| + Usage ```bash $ pulsar-admin topics remove-backlog-quota tenant/namespace/topic