From be68a2663045b6c980fcc7b86f529f4bd45281e2 Mon Sep 17 00:00:00 2001 From: lipenghui Date: Fri, 25 Jun 2021 14:56:04 +0800 Subject: [PATCH] Add authoritative flag for topic policy to avoid redirect loop (#11051) * Add authoritative flag for topic policy to avoid redirect loop 1. Add authoritative flag for topic policy to avoid redirect loop 2. Prevent set topic policy on a non-existing topic 3. Prevent set topic policy on a partition of a partitioned topic 4. Redirect to the broker which is owner of the partition-0 for a partitioned topic when setting topic policy 5. Don't remove policy cache when the topic removed from the broker, this will lead to the topic come back, but can't find the topic policy, since the namespace does not removed from the broker, we will not read from the system topic again. For this case we already handled when the broker does not provide service for that namespace, the topic policy cache under the namespace will be removed. (cherry picked from commit 0b67438d23bbbc46b500e896a18aad715a514fd9) --- .../admin/impl/PersistentTopicsBase.java | 191 +- .../broker/admin/v2/PersistentTopics.java | 1775 +++++++++-------- .../SystemTopicBasedTopicPoliciesService.java | 1 - .../pulsar/broker/admin/AdminApiTest2.java | 2 - .../broker/admin/TopicPoliciesTest.java | 37 +- ...temTopicBasedTopicPoliciesServiceTest.java | 1 + 6 files changed, 1057 insertions(+), 950 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 76211b8e3c419..2b80693fa826a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -499,31 +499,17 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) { }); } - protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse, - DelayedDeliveryPolicies deliveryPolicies) { - TopicPolicies topicPolicies = null; + protected CompletableFuture internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies) { + TopicPolicies topicPolicies; try { - topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName); - } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { - log.error("Topic {} policies have not been initialized yet.", topicName); - asyncResponse.resume(new RestException(e)); - return; - } - if (topicPolicies == null) { - topicPolicies = new TopicPolicies(); + topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); + topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive()); + topicPolicies.setDelayedDeliveryTickTimeMillis( + deliveryPolicies == null ? null : deliveryPolicies.getTickTime()); + } catch (Exception e) { + return FutureUtil.failedFuture(e); } - topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive()); - topicPolicies.setDelayedDeliveryTickTimeMillis( - deliveryPolicies == null ? null : deliveryPolicies.getTickTime()); - pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) - .whenComplete((result, ex) -> { - if (ex != null) { - log.error("Failed set delayed delivery policy for topic", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(Response.noContent().build()); - } - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); } private CompletableFuture updatePartitionInOtherCluster(int numPartitions, Set clusters) { @@ -775,15 +761,21 @@ protected CompletableFuture internalGetDelayedDeliveryP } protected CompletableFuture internalGetOffloadPolicies(boolean applied) { - OffloadPoliciesImpl offloadPolicies = - getTopicPolicies(topicName).map(TopicPolicies::getOffloadPolicies).orElse(null); - if (applied) { - OffloadPoliciesImpl namespacePolicy = - (OffloadPoliciesImpl) getNamespacePolicies(namespaceName).offload_policies; - offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies - , namespacePolicy, pulsar().getConfiguration().getProperties()); + CompletableFuture res = new CompletableFuture<>(); + try { + OffloadPoliciesImpl offloadPolicies = + getTopicPolicies(topicName).map(TopicPolicies::getOffloadPolicies).orElse(null); + if (applied) { + OffloadPoliciesImpl namespacePolicy = + (OffloadPoliciesImpl) getNamespacePolicies(namespaceName).offload_policies; + offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies + , namespacePolicy, pulsar().getConfiguration().getProperties()); + } + res.complete(offloadPolicies); + } catch (Exception e) { + res.completeExceptionally(e); } - return CompletableFuture.completedFuture(offloadPolicies); + return res; } protected CompletableFuture internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies) { @@ -2555,24 +2547,18 @@ protected Map internalGetBacklogQuo return quotaMap; } - protected void internalSetBacklogQuota(AsyncResponse asyncResponse, - BacklogQuota.BacklogQuotaType backlogQuotaType, + protected CompletableFuture internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuotaImpl backlogQuota) { validateTopicPolicyOperation(topicName, PolicyName.BACKLOG, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); + TopicPolicies topicPolicies; if (backlogQuotaType == null) { backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage; } - TopicPolicies topicPolicies; try { - topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName); - } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { - log.error("Topic {} policies have not been initialized yet.", topicName); - asyncResponse.resume(new RestException(e)); - return; - } - if (topicPolicies == null){ - topicPolicies = new TopicPolicies(); + topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); + } catch (Exception e) { + return FutureUtil.failedFuture(e); } RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies); @@ -2580,10 +2566,9 @@ protected void internalSetBacklogQuota(AsyncResponse asyncResponse, log.warn( "[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota", clientAppId(), topicName); - asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Backlog Quota exceeds configured retention quota for topic. " + "Please increase retention quota and retry")); - return; } if (backlogQuota != null) { @@ -2592,22 +2577,15 @@ protected void internalSetBacklogQuota(AsyncResponse asyncResponse, topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name()); } Map backLogQuotaMap = topicPolicies.getBackLogQuotaMap(); - pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) - .whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed updated backlog quota map", ex); - asyncResponse.resume(new RestException(ex)); - } else { - try { - log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}", - clientAppId(), - namespaceName, - topicName.getLocalName(), - jsonMapper().writeValueAsString(backLogQuotaMap)); - } catch (JsonProcessingException ignore) { } - asyncResponse.resume(Response.noContent().build()); - } - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> { + try { + log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}", + clientAppId(), + namespaceName, + topicName.getLocalName(), + jsonMapper().writeValueAsString(backLogQuotaMap)); + } catch (JsonProcessingException ignore) { } + }); } protected CompletableFuture internalGetDeduplication(boolean applied) { @@ -2638,38 +2616,26 @@ protected CompletableFuture internalSetDeduplication(Boolean enabled) { return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); } - protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) { + protected CompletableFuture internalSetMessageTTL(Integer ttlInSecond) { //Validate message ttl value. if (ttlInSecond != null && ttlInSecond < 0) { - throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL"); + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "Invalid value for message TTL")); } TopicPolicies topicPolicies; - //Update existing topic policy or create a new one if not exist. try { - topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName); - } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { - log.error("Topic {} policies have not been initialized yet.", topicName); - asyncResponse.resume(new RestException(e)); - return; - } - if (topicPolicies == null){ - topicPolicies = new TopicPolicies(); + topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new); + } catch (Exception e) { + return FutureUtil.failedFuture(e); } topicPolicies.setMessageTTLInSeconds(ttlInSecond); - pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies) - .whenComplete((result, ex) -> { - if (ex != null) { - log.error("Failed set message ttl for topic", ex); - asyncResponse.resume(new RestException(ex)); - } else { - log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}", - clientAppId(), - namespaceName, - topicName.getLocalName(), - ttlInSecond); - asyncResponse.resume(Response.noContent().build()); - } - }); + return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> { + log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}", + clientAppId(), + namespaceName, + topicName.getLocalName(), + ttlInSecond); + }); } private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicies topicPolicies) { @@ -2686,13 +2652,8 @@ private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicie return retentionPolicies; } - protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, - BacklogQuota.BacklogQuotaType backlogQuotaType) { - internalSetBacklogQuota(asyncResponse, backlogQuotaType, null); - } - - protected void internalGetRetention(AsyncResponse asyncResponse, boolean applied){ - RetentionPolicies retentionPolicies = getTopicPolicies(topicName) + protected RetentionPolicies internalGetRetention(boolean applied) { + return getTopicPolicies(topicName) .map(TopicPolicies::getRetentionPolicies).orElseGet(() -> { if (applied) { RetentionPolicies policies = getNamespacePolicies(namespaceName).retention_policies; @@ -2702,15 +2663,19 @@ protected void internalGetRetention(AsyncResponse asyncResponse, boolean applied } return null; }); - asyncResponse.resume(retentionPolicies == null ? Response.noContent().build() : retentionPolicies); } protected CompletableFuture internalSetRetention(RetentionPolicies retention) { if (retention == null) { return CompletableFuture.completedFuture(null); } - TopicPolicies topicPolicies = getTopicPolicies(topicName) - .orElseGet(TopicPolicies::new); + TopicPolicies topicPolicies; + try { + topicPolicies = getTopicPolicies(topicName) + .orElseGet(TopicPolicies::new); + } catch (Exception e) { + return FutureUtil.failedFuture(e); + } BacklogQuota backlogQuota = topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name()); if (backlogQuota == null) { @@ -2721,9 +2686,9 @@ protected CompletableFuture internalSetRetention(RetentionPolicies retenti log.warn( "[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota", clientAppId(), topicName); - throw new RestException(Status.PRECONDITION_FAILED, + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Retention Quota must exceed configured backlog quota for topic. " - + "Please increase retention quota and retry"); + + "Please increase retention quota and retry")); } topicPolicies.setRetentionPolicies(retention); return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); @@ -2849,12 +2814,30 @@ protected CompletableFuture internalSetReplicatorDispatchRate(DispatchRate return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies); } - protected void preValidation() { + protected CompletableFuture preValidation(boolean authoritative) { checkTopicLevelPolicyEnable(); + if (topicName.isPartitioned()) { + return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, + "Not allowed to set/get topic policy for a partition")); + } if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - validateTopicOwnership(topicName, false); + return checkTopicExistsAsync(topicName).thenCompose(exist -> { + if (!exist) { + throw new RestException(Status.NOT_FOUND, "Topic not found"); + } else { + return getPartitionedTopicMetadataAsync(topicName, false, false) + .thenCompose(metadata -> { + if (metadata.partitions > 0) { + return validateTopicOwnershipAsync(TopicName.get(topicName.toString() + + TopicName.PARTITIONED_TOPIC_SUFFIX + 0), authoritative); + } else { + return validateTopicOwnershipAsync(topicName, authoritative); + } + }); + } + }); } protected CompletableFuture internalRemoveMaxProducers() { @@ -3970,6 +3953,16 @@ protected void internalHandleResult(AsyncResponse asyncResponse, } } + protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) { + Throwable cause = thr.getCause(); + if (!(cause instanceof WebApplicationException) + || !(((WebApplicationException) cause).getResponse().getStatus() == 307)) { + log.error("[{}] Failed to perform {} on topic {}", + clientAppId(), methodName, topicName, cause); + } + resumeAsyncResponseExceptionally(asyncResponse, cause); + } + protected void internalTruncateNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { Topic topic; try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 56e5ce109d4f9..018749af7f27d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -25,7 +25,6 @@ import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponses; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -56,7 +55,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.AuthAction; -import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType; import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies; import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; @@ -279,23 +277,20 @@ public void createNonPartitionedTopic( @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error"), }) public void getOffloadPolicies(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetOffloadPolicies(applied).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed get offloadPolicies", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed get offloadPolicies", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(res); - } - }); + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetOffloadPolicies(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getOffloadPolicies", ex, asyncResponse); + return null; + }); } @POST @@ -304,15 +299,20 @@ public void getOffloadPolicies(@Suspended final AsyncResponse asyncResponse, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), }) public void setOffloadPolicies(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Offload policies for the specified topic") - OffloadPoliciesImpl offloadPolicies) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) { validateTopicName(tenant, namespace, encodedTopic); - validateAdminAccessForTenant(tenant); - internalSetOffloadPolicies(offloadPolicies).whenComplete((res, ex) - -> internalHandleResult(asyncResponse, res, ex, "Failed set offloadPolicies")); + preValidation(authoritative) + .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setOffloadPolicies", ex, asyncResponse); + return null; + }); } @DELETE @@ -321,12 +321,19 @@ public void setOffloadPolicies(@Suspended final AsyncResponse asyncResponse, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), }) public void removeOffloadPolicies(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - setOffloadPolicies(asyncResponse, tenant, namespace, encodedTopic, null); + preValidation(authoritative) + .thenCompose(__ -> internalSetOffloadPolicies(null)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("removeOffloadPolicies", ex, asyncResponse); + return null; + }); } @GET @@ -336,23 +343,19 @@ public void removeOffloadPolicies(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error"), }) public void getMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetMaxUnackedMessagesOnConsumer(applied).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed get maxUnackedMessagesOnConsumer", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed get maxUnackedMessagesOnConsumer", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(res); - } - }); + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied)) + .thenApply(asyncResponse::resume).exceptionally(ex -> { + handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse); + return null; + }); } @POST @@ -365,12 +368,39 @@ public void setMaxUnackedMessagesOnConsumer( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Max unacked messages on consumer policies for the specified topic") Integer maxUnackedNum) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum).whenComplete((res, ex) - -> internalHandleResult(asyncResponse, res, ex, "Failed set MaxUnackedMessagesOnConsumer")); + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setMaxUnackedMessagesOnConsumer", ex, asyncResponse); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer") + @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.") + @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), + @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), }) + public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("deleteMaxUnackedMessagesOnConsumer", ex, asyncResponse); + return null; + }); } @GET @@ -380,17 +410,21 @@ public void setMaxUnackedMessagesOnConsumer( @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error"), }) public void getDeduplicationSnapshotInterval(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies()); - if (topicPolicies.isDeduplicationSnapshotIntervalSecondsSet()) { - asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds()); - } else { - asyncResponse.resume(Response.noContent().build()); - } + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenRun(() -> { + TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies()); + asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("getDeduplicationSnapshotInterval", ex, asyncResponse); + return null; + }); } @POST @@ -404,14 +438,17 @@ public void setDeduplicationSnapshotInterval( @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Interval to take deduplication snapshot for the specified topic") - Integer interval) { + Integer interval, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - if (topicName.isGlobal()) { - validateGlobalNamespaceOwnership(namespaceName); - } - internalSetDeduplicationSnapshotInterval(interval).whenComplete((res, ex) - -> internalHandleResult(asyncResponse, res, ex, "Failed set deduplicationSnapshotInterval")); + preValidation(authoritative) + .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(interval)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setDeduplicationSnapshotInterval", ex, asyncResponse); + return null; + }); } @DELETE @@ -420,35 +457,19 @@ public void setDeduplicationSnapshotInterval( @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), }) public void deleteDeduplicationSnapshotInterval(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetDeduplicationSnapshotInterval(null).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed delete deduplicationSnapshotInterval", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed delete deduplicationSnapshotInterval", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(Response.noContent().build()); - } - }); - } - - @DELETE - @Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnConsumer") - @ApiOperation(value = "Delete max unacked messages per consumer config on a topic.") - @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), - @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), }) - public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null); + preValidation(authoritative) + .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("deleteDeduplicationSnapshotInterval", ex, asyncResponse); + return null; + }); } @GET @@ -458,14 +479,19 @@ public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse as @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error"), }) public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetInactiveTopicPolicies(applied).whenComplete((res, ex) - -> internalHandleResult(asyncResponse, res, ex, "Failed get InactiveTopicPolicies")); + preValidation(authoritative) + .thenCompose(__ -> internalGetInactiveTopicPolicies(applied)) + .thenApply(asyncResponse::resume).exceptionally(ex -> { + handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse); + return null; + }); } @POST @@ -474,15 +500,21 @@ public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncRespons @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), }) public void setInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "inactive topic policies for the specified topic") - InactiveTopicPolicies inactiveTopicPolicies) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "inactive topic policies for the specified topic") + InactiveTopicPolicies inactiveTopicPolicies) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetInactiveTopicPolicies(inactiveTopicPolicies).whenComplete((res, ex) - -> internalHandleResult(asyncResponse, res, ex, "Failed set InactiveTopicPolicies")); + preValidation(authoritative) + .thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setInactiveTopicPolicies", ex, asyncResponse); + return null; + }); } @DELETE @@ -491,11 +523,19 @@ public void setInactiveTopicPolicies(@Suspended final AsyncResponse asyncRespons @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), }) public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - setInactiveTopicPolicies(asyncResponse, tenant, namespace, encodedTopic, null); + preValidation(authoritative) + .thenCompose(__ -> internalSetInactiveTopicPolicies(null)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("deleteInactiveTopicPolicies", ex, asyncResponse); + return null; + }); } @GET @@ -505,14 +545,20 @@ public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse asyncResp @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error"), }) public void getMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetMaxUnackedMessagesOnSubscription(applied).whenComplete((res, ex) - -> internalHandleResult(asyncResponse, res, ex, "Failed get maxUnackedMessagesOnSubscription")); + preValidation(authoritative) + .thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getMaxUnackedMessagesOnSubscription", ex, asyncResponse); + return null; + }); } @POST @@ -525,14 +571,19 @@ public void setMaxUnackedMessagesOnSubscription( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Max unacked messages on subscription policies for the specified topic") Integer maxUnackedNum) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); - validatePoliciesReadOnlyAccess(); - internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum).whenComplete((res, ex) - -> internalHandleResult(asyncResponse, res, ex, "Failed set MaxUnackedMessagesOnSubscription")); + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setMaxUnackedMessagesOnSubscription", ex, asyncResponse); + return null; + }); } @@ -543,11 +594,21 @@ public void setMaxUnackedMessagesOnSubscription( @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), }) public void deleteMaxUnackedMessagesOnSubscription(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - setMaxUnackedMessagesOnSubscription(asyncResponse, tenant, namespace, encodedTopic, null); + validateTopicName(tenant, namespace, encodedTopic); + validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE); + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(null)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("deleteMaxUnackedMessagesOnSubscription", ex, asyncResponse); + return null; + }); } @GET @@ -557,23 +618,20 @@ public void deleteMaxUnackedMessagesOnSubscription(@Suspended final AsyncRespons @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code = 500, message = "Internal server error"), }) public void getDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetDelayedDeliveryPolicies(applied).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed get DelayedDeliveryPolicies", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed get DelayedDeliveryPolicies", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(res); - } - }); + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getDelayedDeliveryPolicies", ex, asyncResponse); + return null; + }); } @POST @@ -586,13 +644,20 @@ public void setDelayedDeliveryPolicies( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Delayed delivery policies for the specified topic") DelayedDeliveryPolicies deliveryPolicies) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE); validatePoliciesReadOnlyAccess(); - internalSetDelayedDeliveryPolicies(asyncResponse, deliveryPolicies); + validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE); + preValidation(authoritative) + .thenCompose(__ -> internalSetDelayedDeliveryPolicies(deliveryPolicies)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setDelayedDeliveryPolicies", ex, asyncResponse); + return null; + }); } @@ -603,11 +668,21 @@ public void setDelayedDeliveryPolicies( @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"), }) public void deleteDelayedDeliveryPolicies(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - setDelayedDeliveryPolicies(asyncResponse, tenant, namespace, encodedTopic, null); + validatePoliciesReadOnlyAccess(); + validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE); + preValidation(authoritative) + .thenCompose(__ -> internalSetDelayedDeliveryPolicies(null)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("deleteDelayedDeliveryPolicies", ex, asyncResponse); + return null; + }); } /** @@ -1467,13 +1542,21 @@ public PersistentOfflineTopicStats getBacklog( @ApiResponse(code = 404, message = "Topic policy does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")}) - public Map getBacklogQuotaMap(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { + public void getBacklogQuotaMap( + @Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - return internalGetBacklogQuota(applied); + preValidation(authoritative) + .thenAccept(__ -> asyncResponse.resume(internalGetBacklogQuota(applied))) + .exceptionally(ex -> { + handleTopicPolicyException("getBacklogQuotaMap", ex, asyncResponse); + return null; + }); } @POST @@ -1490,10 +1573,17 @@ public void setBacklogQuota( @Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType, BacklogQuotaImpl backlogQuota) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetBacklogQuota(asyncResponse, backlogQuotaType, backlogQuota); + preValidation(authoritative) + .thenCompose(__ -> internalSetBacklogQuota(backlogQuotaType, backlogQuota)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setBacklogQuota", ex, asyncResponse); + return null; + }); } @DELETE @@ -1505,12 +1595,19 @@ public void setBacklogQuota( message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType) { + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("backlogQuotaType") BacklogQuotaType backlogQuotaType, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemoveBacklogQuota(asyncResponse, backlogQuotaType); + preValidation(authoritative) + .thenCompose(__ -> internalSetBacklogQuota(backlogQuotaType, null)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("removeBacklogQuota", ex, asyncResponse); + return null; + }); } @GET @@ -1520,22 +1617,31 @@ public void removeBacklogQuota(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 404, message = "Topic does not exist"), @ApiResponse(code = 405, message = "Topic level policy is disabled, enable the topic level policy and retry")}) - public Integer getMessageTTL(@PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - return getTopicPolicies(topicName) - .map(TopicPolicies::getMessageTTLInSeconds) - .orElseGet(() -> { - if (applied) { - Integer otherLevelTTL = getNamespacePolicies(namespaceName).message_ttl_in_seconds; - return otherLevelTTL == null ? pulsar().getConfiguration().getTtlDurationDefaultInSeconds() - : otherLevelTTL; - } - return null; - }); + public void getMessageTTL(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenAccept(__ -> + asyncResponse.resume(getTopicPolicies(topicName) + .map(TopicPolicies::getMessageTTLInSeconds) + .orElseGet(() -> { + if (applied) { + Integer otherLevelTTL = getNamespacePolicies(namespaceName).message_ttl_in_seconds; + return otherLevelTTL == null ? pulsar().getConfiguration().getTtlDurationDefaultInSeconds() + : otherLevelTTL; + } + return null; + })) + ) + .exceptionally(ex -> { + handleTopicPolicyException("getMessageTTL", ex, asyncResponse); + return null; + }); } @POST @@ -1548,14 +1654,21 @@ public Integer getMessageTTL(@PathParam("tenant") String tenant, "Topic level policy is disabled, enable the topic level policy and retry"), @ApiResponse(code = 412, message = "Invalid message TTL value")}) public void setMessageTTL(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "TTL in seconds for the specified namespace", required = true) - @QueryParam("messageTTL") int messageTTL) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "TTL in seconds for the specified namespace", required = true) + @QueryParam("messageTTL") Integer messageTTL, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetMessageTTL(asyncResponse, messageTTL); + preValidation(authoritative) + .thenCompose(__ -> internalSetMessageTTL(messageTTL)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setMessageTTL", ex, asyncResponse); + return null; + }); } @DELETE @@ -1569,12 +1682,19 @@ public void setMessageTTL(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, enable the topic level policy and retry"), @ApiResponse(code = 412, message = "Invalid message TTL value")}) public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetMessageTTL(asyncResponse, null); + preValidation(authoritative) + .thenCompose(__ -> internalSetMessageTTL(null)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("removeMessageTTL", ex, asyncResponse); + return null; + }); } @GET @@ -1586,23 +1706,20 @@ public void removeMessageTTL(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry")}) public void getDeduplication(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetDeduplication(applied).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed get Deduplication", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed get Deduplication", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(res); - } - }); + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetDeduplication(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getDeduplication", ex, asyncResponse); + return null; + }); } @POST @@ -1617,21 +1734,18 @@ public void setDeduplication( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "DeduplicationEnabled policies for the specified topic") Boolean enabled) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetDeduplication(enabled).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed updated deduplication", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed updated deduplication", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(Response.noContent().build()); - } - }); + preValidation(authoritative) + .thenCompose(__ -> internalSetDeduplication(enabled)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setDeduplication", ex, asyncResponse); + return null; + }); } @DELETE @@ -1643,11 +1757,19 @@ public void setDeduplication( message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeDeduplication(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - setDeduplication(asyncResponse, tenant, namespace, encodedTopic, null); + preValidation(authoritative) + .thenCompose(__ -> internalSetDeduplication(null)) + .thenRun(() -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + handleTopicPolicyException("setDeduplication", ex, asyncResponse); + return null; + }); } @GET @@ -1662,16 +1784,16 @@ public void getRetention(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - try { - internalGetRetention(asyncResponse, applied); - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + preValidation(authoritative) + .thenRun(() -> asyncResponse.resume(internalGetRetention(applied))) + .exceptionally(ex -> { + handleTopicPolicyException("getRetention", ex, asyncResponse); + return null; + }); } @POST @@ -1687,17 +1809,13 @@ public void setRetention(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Retention policies for the specified namespace") RetentionPolicies retention) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetRetention(retention).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed updated retention", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed updated retention", ex); - asyncResponse.resume(new RestException(ex)); - } else { + preValidation(authoritative) + .thenCompose(__ -> internalSetRetention(retention)) + .thenRun(() -> { try { log.info("[{}] Successfully updated retention: namespace={}, topic={}, retention={}", clientAppId(), @@ -1707,8 +1825,11 @@ public void setRetention(@Suspended final AsyncResponse asyncResponse, } catch (JsonProcessingException ignore) { } asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setRetention", ex, asyncResponse); + return null; + }); } @DELETE @@ -1721,23 +1842,25 @@ public void setRetention(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Retention Quota must exceed backlog quota")}) public void removeRetention(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemoveRetention().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed updated retention", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveRetention()) + .thenRun(() -> { log.info("[{}] Successfully remove retention: namespace={}, topic={}", clientAppId(), namespaceName, topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setRetention", ex, asyncResponse); + return null; + }); } @GET @@ -1749,23 +1872,20 @@ public void removeRetention(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void getPersistence(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetPersistence(applied).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed get persistence policies", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed get persistence policies", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(res); - } - }); + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetPersistence(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getPersistence", ex, asyncResponse); + return null; + }); } @POST @@ -1778,21 +1898,17 @@ public void getPersistence(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 400, message = "Invalid persistence policies")}) public void setPersistence(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Bookkeeper persistence policies for specified topic") - PersistencePolicies persistencePolicies) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetPersistence(persistencePolicies).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed updated persistence policies", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed updated persistence policies", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Bookkeeper persistence policies for specified topic") + PersistencePolicies persistencePolicies) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetPersistence(persistencePolicies)) + .thenRun(() -> { try { log.info("[{}] Successfully updated persistence policies: " + "namespace={}, topic={}, persistencePolicies={}", @@ -1803,8 +1919,11 @@ public void setPersistence(@Suspended final AsyncResponse asyncResponse, } catch (JsonProcessingException ignore) { } asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setPersistence", ex, asyncResponse); + return null; + }); } @DELETE @@ -1816,23 +1935,25 @@ public void setPersistence(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removePersistence(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemovePersistence().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed updated retention", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemovePersistence()) + .thenRun(() -> { log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}", clientAppId(), namespaceName, topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removePersistence", ex, asyncResponse); + return null; + }); } @GET @@ -1844,23 +1965,22 @@ public void removePersistence(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - try { - Optional maxSubscriptionsPerTopic = internalGetMaxSubscriptionsPerTopic(); - if (!maxSubscriptionsPerTopic.isPresent()) { - asyncResponse.resume(Response.noContent().build()); - } else { - asyncResponse.resume(maxSubscriptionsPerTopic.get()); - } - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + preValidation(authoritative) + .thenRun(() -> { + Optional maxSubscriptionsPerTopic = internalGetMaxSubscriptionsPerTopic(); + asyncResponse.resume(maxSubscriptionsPerTopic.isPresent() ? maxSubscriptionsPerTopic.get() + : Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("getMaxSubscriptions", ex, asyncResponse); + return null; + }); } @POST @@ -1873,26 +1993,25 @@ public void getMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResp @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Invalid value of maxSubscriptionsPerTopic")}) public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Updating maxSubscriptionsPerTopic failed", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Updating maxSubscriptionsPerTopic failed", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic)) + .thenRun(() -> { log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}" + ", maxSubscriptions={}" , clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setMaxSubscriptions", ex, asyncResponse); + return null; + }); } @DELETE @@ -1904,23 +2023,23 @@ public void setMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResp message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetMaxSubscriptionsPerTopic(null).whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove maxSubscriptionsPerTopic", ex); - asyncResponse.resume(new RestException(ex)); - } else { - log.info("[{}] Successfully remove maximum subscription limit: namespace={}, topic={}", - clientAppId(), - namespaceName, - topicName.getLocalName()); + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null)) + .thenRun(() -> { + log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}", + clientAppId(), namespaceName, topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeMaxSubscriptions", ex, asyncResponse); + return null; + }); } @GET @@ -1932,23 +2051,20 @@ public void removeMaxSubscriptionsPerTopic(@Suspended final AsyncResponse asyncR message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetReplicatorDispatchRate(applied).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed get replicator dispatchRate", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed get replicator dispatchRate", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(res); - } - }); + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetReplicatorDispatchRate(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getReplicatorDispatchRate", ex, asyncResponse); + return null; + }); } @POST @@ -1961,28 +2077,26 @@ public void getReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Invalid value of replicatorDispatchRate")}) public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Replicator dispatch rate of the topic") - DispatchRateImpl dispatchRate) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetReplicatorDispatchRate(dispatchRate).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Updating replicatorDispatchRate failed", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Updating replicatorDispatchRate failed", ex); - asyncResponse.resume(new RestException(ex)); - } else { - log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}" - + ", replicatorDispatchRate={}" - , clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate); - asyncResponse.resume(Response.noContent().build()); - } - }); - } + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate)) + .thenRun(() -> { + log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}" + + ", replicatorDispatchRate={}", + clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setReplicatorDispatchRate", ex, asyncResponse); + return null; + }); + } @DELETE @Path("/{tenant}/{namespace}/{topic}/replicatorDispatchRate") @@ -1993,21 +2107,23 @@ public void setReplicatorDispatchRate(@Suspended final AsyncResponse asyncRespon message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetReplicatorDispatchRate(null).whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove replicatorDispatchRate", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetReplicatorDispatchRate(null)) + .thenRun(() -> { log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}", clientAppId(), namespaceName, topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeReplicatorDispatchRate", ex, asyncResponse); + return null; + }); } @GET @@ -2019,23 +2135,20 @@ public void removeReplicatorDispatchRate(@Suspended final AsyncResponse asyncRes message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void getMaxProducers(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetMaxProducers(applied).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed get maxProducers", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed get maxProducers", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(res); - } - }); + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetMaxProducers(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getMaxProducers", ex, asyncResponse); + return null; + }); } @POST @@ -2048,28 +2161,27 @@ public void getMaxProducers(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Invalid value of maxProducers")}) public void setMaxProducers(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "The max producers of the topic") int maxProducers) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetMaxProducers(maxProducers).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed updated persistence policies", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed updated persistence policies", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "The max producers of the topic") int maxProducers) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxProducers(maxProducers)) + .thenRun(() -> { log.info("[{}] Successfully updated max producers: namespace={}, topic={}, maxProducers={}", clientAppId(), namespaceName, topicName.getLocalName(), maxProducers); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setMaxProducers", ex, asyncResponse); + return null; + }); } @DELETE @@ -2081,23 +2193,25 @@ public void setMaxProducers(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeMaxProducers(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemoveMaxProducers().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove maxProducers", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveMaxProducers()) + .thenRun(() -> { log.info("[{}] Successfully remove max producers: namespace={}, topic={}", clientAppId(), namespaceName, topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeMaxProducers", ex, asyncResponse); + return null; + }); } @GET @@ -2109,23 +2223,20 @@ public void removeMaxProducers(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void getMaxConsumers(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetMaxConsumers(applied).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed get maxConsumers", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed get maxConsumers", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(res); - } - }); + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetMaxConsumers(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getMaxConsumers", ex, asyncResponse); + return null; + }); } @POST @@ -2138,28 +2249,27 @@ public void getMaxConsumers(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Invalid value of maxConsumers")}) public void setMaxConsumers(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "The max consumers of the topic") int maxConsumers) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetMaxConsumers(maxConsumers).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed updated persistence policies", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed updated persistence policies", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "The max consumers of the topic") int maxConsumers) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxConsumers(maxConsumers)) + .thenRun(() -> { log.info("[{}] Successfully updated max consumers: namespace={}, topic={}, maxConsumers={}", clientAppId(), namespaceName, topicName.getLocalName(), maxConsumers); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setMaxConsumers", ex, asyncResponse); + return null; + }); } @DELETE @@ -2171,23 +2281,25 @@ public void setMaxConsumers(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeMaxConsumers(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemoveMaxConsumers().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove maxConsumers", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveMaxConsumers()) + .thenRun(() -> { log.info("[{}] Successfully remove max consumers: namespace={}, topic={}", clientAppId(), namespaceName, topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeMaxConsumers", ex, asyncResponse); + return null; + }); } @GET @@ -2199,23 +2311,21 @@ public void removeMaxConsumers(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void getMaxMessageSize(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - try { - Optional policies = internalGetMaxMessageSize(); - if (policies.isPresent()) { - asyncResponse.resume(policies.get()); - } else { - asyncResponse.resume(Response.noContent().build()); - } - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + preValidation(authoritative) + .thenRun(() -> { + Optional policies = internalGetMaxMessageSize(); + asyncResponse.resume(policies.isPresent() ? policies.get() : Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("getMaxMessageSize", ex, asyncResponse); + return null; + }); } @POST @@ -2228,28 +2338,27 @@ public void getMaxMessageSize(@Suspended final AsyncResponse asyncResponse, @ApiResponse(code = 409, message = "Concurrent modification"), @ApiResponse(code = 412, message = "Invalid value of maxConsumers")}) public void setMaxMessageSize(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "The max message size of the topic") int maxMessageSize) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetMaxMessageSize(maxMessageSize).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed updated persistence policies", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed updated persistence policies", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "The max message size of the topic") int maxMessageSize) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxMessageSize(maxMessageSize)) + .thenRun(() -> { log.info("[{}] Successfully set max message size: namespace={}, topic={}, maxMessageSiz={}", clientAppId(), namespaceName, topicName.getLocalName(), maxMessageSize); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setMaxMessageSize", ex, asyncResponse); + return null; + }); } @DELETE @@ -2261,23 +2370,25 @@ public void setMaxMessageSize(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, to enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeMaxMessageSize(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetMaxMessageSize(null).whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove maxMessageSize", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxMessageSize(null)) + .thenRun(() -> { log.info("[{}] Successfully remove max message size: namespace={}, topic={}", clientAppId(), namespaceName, topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeMaxMessageSize", ex, asyncResponse); + return null; + }); } @@ -2495,20 +2606,17 @@ public void getDispatchRate(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetDispatchRate(applied).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed get dispatchRate", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed get dispatchRate", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(res); - } - }); + preValidation(authoritative) + .thenCompose(__ -> internalGetDispatchRate(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getDispatchRate", ex, asyncResponse); + return null; + }); } @POST @@ -2520,21 +2628,16 @@ public void getDispatchRate(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void setDispatchRate(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Dispatch rate for the specified topic") - DispatchRateImpl dispatchRate) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetDispatchRate(dispatchRate).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed to set topic dispatch rate", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed to set topic dispatch rate"); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Dispatch rate for the specified topic") DispatchRateImpl dispatchRate) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetDispatchRate(dispatchRate)) + .thenRun(() -> { try { log.info("[{}] Successfully set topic dispatch rate:" + " tenant={}, namespace={}, topic={}, dispatchRate={}", @@ -2545,8 +2648,11 @@ public void setDispatchRate(@Suspended final AsyncResponse asyncResponse, jsonMapper().writeValueAsString(dispatchRate)); } catch (JsonProcessingException ignore) {} asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setDispatchRate", ex, asyncResponse); + return null; + }); } @DELETE @@ -2558,24 +2664,26 @@ public void setDispatchRate(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeDispatchRate(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemoveDispatchRate().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove topic dispatch rate", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveDispatchRate()) + .thenRun(() -> { log.info("[{}] Successfully remove topic dispatch rate: tenant={}, namespace={}, topic={}", - clientAppId(), - tenant, - namespace, - topicName.getLocalName()); + clientAppId(), + tenant, + namespace, + topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeDispatchRate", ex, asyncResponse); + return null; + }); } @GET @@ -2590,20 +2698,17 @@ public void getSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResp @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetSubscriptionDispatchRate(applied).whenComplete((res, ex) -> { - if (ex instanceof RestException) { - log.error("Failed get subscription dispatchRate", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed get subscription dispatchRate", ex); - asyncResponse.resume(new RestException(ex)); - } else { - asyncResponse.resume(res); - } - }); + preValidation(authoritative) + .thenCompose(__ -> internalGetSubscriptionDispatchRate(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getSubscriptionDispatchRate", ex, asyncResponse); + return null; + }); } @POST @@ -2619,18 +2724,14 @@ public void setSubscriptionDispatchRate( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Subscription message dispatch rate for the specified topic") DispatchRateImpl dispatchRate) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetSubscriptionDispatchRate(dispatchRate).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName(), ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed to set topic: {} subscription dispatch rate", topicName.getLocalName()); - asyncResponse.resume(new RestException(ex)); - } else { + preValidation(authoritative) + .thenCompose(__ -> internalSetSubscriptionDispatchRate(dispatchRate)) + .thenRun(() -> { try { log.info("[{}] Successfully set topic subscription dispatch rate:" + " tenant={}, namespace={}, topic={}, dispatchRate={}", @@ -2641,8 +2742,11 @@ public void setSubscriptionDispatchRate( jsonMapper().writeValueAsString(dispatchRate)); } catch (JsonProcessingException ignore) {} asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setSubscriptionDispatchRate", ex, asyncResponse); + return null; + }); } @DELETE @@ -2654,24 +2758,26 @@ public void setSubscriptionDispatchRate( message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemoveSubscriptionDispatchRate().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove topic: {} subscription dispatch rate", topicName.getLocalName(), ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveSubscriptionDispatchRate()) + .thenRun(() -> { log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}", - clientAppId(), - tenant, - namespace, - topicName.getLocalName()); + clientAppId(), + tenant, + namespace, + topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeSubscriptionDispatchRate", ex, asyncResponse); + return null; + }); } @GET @@ -2683,14 +2789,20 @@ public void removeSubscriptionDispatchRate(@Suspended final AsyncResponse asyncR message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetCompactionThreshold(applied).whenComplete((res, ex) - -> internalHandleResult(asyncResponse, res, ex, "Failed get compaction threshold")); + preValidation(authoritative) + .thenCompose(__ -> internalGetCompactionThreshold(applied)) + .thenApply(asyncResponse::resume) + .exceptionally(ex -> { + handleTopicPolicyException("getCompactionThreshold", ex, asyncResponse); + return null; + }); } @POST @@ -2702,20 +2814,16 @@ public void getCompactionThreshold(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetCompactionThreshold(compactionThreshold).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed to set topic dispatch rate", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed to set topic dispatch rate"); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetCompactionThreshold(compactionThreshold)) + .thenRun(() -> { try { log.info("[{}] Successfully set topic compaction threshold:" + " tenant={}, namespace={}, topic={}, compactionThreshold={}", @@ -2726,8 +2834,11 @@ public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse, jsonMapper().writeValueAsString(compactionThreshold)); } catch (JsonProcessingException ignore) {} asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setCompactionThreshold", ex, asyncResponse); + return null; + }); } @DELETE @@ -2739,24 +2850,26 @@ public void setCompactionThreshold(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeCompactionThreshold(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemoveCompactionThreshold().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove topic dispatch rate", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveCompactionThreshold()) + .thenRun(() -> { log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}", - clientAppId(), - tenant, - namespace, - topicName.getLocalName()); + clientAppId(), + tenant, + namespace, + topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeCompactionThreshold", ex, asyncResponse); + return null; + }); } @GET @@ -2768,23 +2881,22 @@ public void removeCompactionThreshold(@Suspended final AsyncResponse asyncRespon message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void getMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - try { - Optional maxConsumersPerSubscription = internalGetMaxConsumersPerSubscription(); - if (!maxConsumersPerSubscription.isPresent()) { - asyncResponse.resume(Response.noContent().build()); - } else { - asyncResponse.resume(maxConsumersPerSubscription.get()); - } - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + preValidation(authoritative) + .thenRun(() -> { + Optional maxConsumersPerSubscription = internalGetMaxConsumersPerSubscription(); + asyncResponse.resume(maxConsumersPerSubscription.isPresent() ? maxConsumersPerSubscription.get() + : Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("getMaxConsumersPerSubscription", ex, asyncResponse); + return null; + }); } @POST @@ -2800,17 +2912,13 @@ public void setMaxConsumersPerSubscription( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed to set topic {} max consumers per subscription ", topicName.getLocalName(), ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed to set topic max consumers per subscription"); - asyncResponse.resume(new RestException(ex)); - } else { + preValidation(authoritative) + .thenCompose(__ -> internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription)) + .thenRun(() -> { try { log.info("[{}] Successfully set topic max consumers per subscription:" + " tenant={}, namespace={}, topic={}, maxConsumersPerSubscription={}", @@ -2821,8 +2929,11 @@ public void setMaxConsumersPerSubscription( jsonMapper().writeValueAsString(maxConsumersPerSubscription)); } catch (JsonProcessingException ignore) {} asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setMaxConsumersPerSubscription", ex, asyncResponse); + return null; + }); } @DELETE @@ -2834,16 +2945,15 @@ public void setMaxConsumersPerSubscription( message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeMaxConsumersPerSubscription(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemoveMaxConsumersPerSubscription().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove topic {} max consuners per subscription", topicName.getLocalName(), ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveMaxConsumersPerSubscription()) + .thenRun(() -> { log.info("[{}] Successfully remove topic max consumers per subscription:" + " tenant={}, namespace={}, topic={}", clientAppId(), @@ -2851,8 +2961,11 @@ public void removeMaxConsumersPerSubscription(@Suspended final AsyncResponse asy namespace, topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeMaxConsumersPerSubscription", ex, asyncResponse); + return null; + }); } @GET @@ -2866,21 +2979,20 @@ public void removeMaxConsumersPerSubscription(@Suspended final AsyncResponse asy public void getPublishRate(@Suspended final AsyncResponse asyncResponse, @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - try { - Optional publishRate = internalGetPublishRate(); - if (!publishRate.isPresent()) { - asyncResponse.resume(Response.noContent().build()); - } else { - asyncResponse.resume(publishRate.get()); - } - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + preValidation(authoritative) + .thenRun(() -> { + Optional publishRate = internalGetPublishRate(); + asyncResponse.resume(publishRate.isPresent() ? publishRate.get() + : Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("getPublishRate", ex, asyncResponse); + return null; + }); } @POST @@ -2892,20 +3004,16 @@ public void getPublishRate(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void setPublishRate(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetPublishRate(publishRate).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed to set topic dispatch rate", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed to set topic dispatch rate"); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetPublishRate(publishRate)) + .thenRun(() -> { try { log.info("[{}] Successfully set topic publish rate:" + " tenant={}, namespace={}, topic={}, publishRate={}", @@ -2916,8 +3024,42 @@ public void setPublishRate(@Suspended final AsyncResponse asyncResponse, jsonMapper().writeValueAsString(publishRate)); } catch (JsonProcessingException ignore) {} asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setPublishRate", ex, asyncResponse); + return null; + }); + } + + @DELETE + @Path("/{tenant}/{namespace}/{topic}/publishRate") + @ApiOperation(value = "Remove message publish rate configuration for specified topic.") + @ApiResponses(value = {@ApiResponse(code = 403, message = "Topic does not exist"), + @ApiResponse(code = 404, message = "Topic does not exist"), + @ApiResponse(code = 405, + message = "Topic level policy is disabled, please enable the topic level policy and retry"), + @ApiResponse(code = 409, message = "Concurrent modification")}) + public void removePublishRate(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemovePublishRate()) + .thenRun(() -> { + log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}", + clientAppId(), + tenant, + namespace, + topicName.getLocalName()); + asyncResponse.resume(Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removePublishRate", ex, asyncResponse); + return null; + }); } @GET @@ -2929,26 +3071,22 @@ public void setPublishRate(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void getSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - try { - Optional> subscriptionTypesEnabled = internalGetSubscriptionTypesEnabled(); - if (!subscriptionTypesEnabled.isPresent()) { - asyncResponse.resume(Response.noContent().build()); - } else { - Set subscriptionTypes = new HashSet<>(); - subscriptionTypesEnabled.get().forEach(subType -> - subscriptionTypes.add(SubscriptionType.valueOf(subType.name()))); - asyncResponse.resume(subscriptionTypes); - } - } catch (RestException e) { - asyncResponse.resume(e); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + preValidation(authoritative) + .thenRun(() -> { + Optional> subscriptionTypesEnabled = internalGetSubscriptionTypesEnabled(); + asyncResponse.resume(subscriptionTypesEnabled.isPresent() ? subscriptionTypesEnabled.get() + : Response.noContent().build()); + }) + .exceptionally(ex -> { + handleTopicPolicyException("getSubscriptionTypesEnabled", ex, asyncResponse); + return null; + }); } @POST @@ -2960,21 +3098,17 @@ public void getSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResp message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void setSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @ApiParam(value = "Enable sub types for the specified topic") - Set subscriptionTypesEnabled) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed to set topic is enable sub types", ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed to set topic is enable sub types", ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Enable sub types for the specified topic") + Set subscriptionTypesEnabled) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalSetSubscriptionTypesEnabled(subscriptionTypesEnabled)) + .thenRun(() -> { try { log.info("[{}] Successfully set topic is enabled sub types :" + " tenant={}, namespace={}, topic={}, subscriptionTypesEnabled={}", @@ -2985,37 +3119,11 @@ public void setSubscriptionTypesEnabled(@Suspended final AsyncResponse asyncResp jsonMapper().writeValueAsString(subscriptionTypesEnabled)); } catch (JsonProcessingException ignore) {} asyncResponse.resume(Response.noContent().build()); - } - }); - } - - @DELETE - @Path("/{tenant}/{namespace}/{topic}/publishRate") - @ApiOperation(value = "Remove message publish rate configuration for specified topic.") - @ApiResponses(value = {@ApiResponse(code = 403, message = "Topic does not exist"), - @ApiResponse(code = 404, message = "Topic does not exist"), - @ApiResponse(code = 405, - message = "Topic level policy is disabled, please enable the topic level policy and retry"), - @ApiResponse(code = 409, message = "Concurrent modification")}) - public void removePublishRate(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemovePublishRate().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove topic publish rate", ex); - asyncResponse.resume(new RestException(ex)); - } else { - log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}", - clientAppId(), - tenant, - namespace, - topicName.getLocalName()); - asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setSubscriptionTypesEnabled", ex, asyncResponse); + return null; + }); } @GET @@ -3027,14 +3135,18 @@ public void removePublishRate(@Suspended final AsyncResponse asyncResponse, message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void getSubscribeRate(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("applied") boolean applied) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalGetSubscribeRate(applied).whenComplete((res, ex) -> { - internalHandleResult(asyncResponse, res, ex, "Failed get subscribe rate"); + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("applied") boolean applied, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalGetSubscribeRate(applied)) + .thenApply(asyncResponse::resume).exceptionally(ex -> { + handleTopicPolicyException("getSubscribeRate", ex, asyncResponse); + return null; }); } @@ -3051,17 +3163,13 @@ public void setSubscribeRate( @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, + @ApiParam(value = "Is authentication required to perform this operation") + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) { validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalSetSubscribeRate(subscribeRate).whenComplete((r, ex) -> { - if (ex instanceof RestException) { - log.error("Failed to set topic {} subscribe rate", topicName.getLocalName(), ex); - asyncResponse.resume(ex); - } else if (ex != null) { - log.error("Failed to set topic subscribe rate"); - asyncResponse.resume(new RestException(ex)); - } else { + preValidation(authoritative) + .thenCompose(__ -> internalSetSubscribeRate(subscribeRate)) + .thenRun(() -> { try { log.info("[{}] Successfully set topic subscribe rate:" + " tenant={}, namespace={}, topic={}, subscribeRate={}", @@ -3072,8 +3180,11 @@ public void setSubscribeRate( jsonMapper().writeValueAsString(subscribeRate)); } catch (JsonProcessingException ignore) {} asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("setSubscribeRate", ex, asyncResponse); + return null; + }); } @DELETE @@ -3085,24 +3196,26 @@ public void setSubscribeRate( message = "Topic level policy is disabled, please enable the topic level policy and retry"), @ApiResponse(code = 409, message = "Concurrent modification")}) public void removeSubscribeRate(@Suspended final AsyncResponse asyncResponse, - @PathParam("tenant") String tenant, - @PathParam("namespace") String namespace, - @PathParam("topic") @Encoded String encodedTopic) { - validateTopicName(tenant, namespace, encodedTopic); - preValidation(); - internalRemoveSubscribeRate().whenComplete((r, ex) -> { - if (ex != null) { - log.error("Failed to remove topic {} subscribe rate ", topicName.getLocalName(), ex); - asyncResponse.resume(new RestException(ex)); - } else { + @PathParam("tenant") String tenant, + @PathParam("namespace") String namespace, + @PathParam("topic") @Encoded String encodedTopic, + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) { + validateTopicName(tenant, namespace, encodedTopic); + preValidation(authoritative) + .thenCompose(__ -> internalRemoveSubscribeRate()) + .thenRun(() -> { log.info("[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}", clientAppId(), tenant, namespace, topicName.getLocalName()); asyncResponse.resume(Response.noContent().build()); - } - }); + }) + .exceptionally(ex -> { + handleTopicPolicyException("removeSubscribeRate", ex, asyncResponse); + return null; + }); } @DELETE diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index 807da68a984b4..87ff3b8710f04 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -390,7 +390,6 @@ public void clean(TopicName topicName) { //change persistent://tenant/namespace/xxx-partition-0 to persistent://tenant/namespace/xxx realTopicName = TopicName.get(topicName.getPartitionedTopicName()); } - policiesCache.remove(realTopicName); listeners.remove(realTopicName); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 3ca5ea96ed2ef..f337966156927 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -289,8 +289,6 @@ public void testTopicPoliciesWithMultiBroker() throws Exception { int partitionNum = 3; admin.topics().createPartitionedTopic(topic, partitionNum); pulsarClient.newConsumer().topic(topic).subscriptionName("sub").subscribe().close(); - TopicName topicName = TopicName.get(topic); - Awaitility.await().until(()-> pulsar.getTopicPoliciesService().cacheIsInitialized(topicName)); setTopicPoliciesAndValidate(admin2, admin3, topic); //for non-partitioned topic, we can get topic policies from every broker diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java index 3d47212403527..8ccecc69f386d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java @@ -591,24 +591,30 @@ public void testSetPersistence() throws Exception { Awaitility.await() .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - + admin.topics().createNonPartitionedTopic(persistenceTopic); admin.topics().setPersistence(persistenceTopic, persistencePolicies); Awaitility.await() .untilAsserted(() -> Assert.assertEquals(admin.topics().getPersistence(persistenceTopic), persistencePolicies)); - - admin.lookups().lookupTopic(persistenceTopic); + Consumer consumer = pulsarClient.newConsumer() + .topic(persistenceTopic) + .subscriptionName("test") + .subscribe(); + admin.topics().unload(persistenceTopic); Topic t = pulsar.getBrokerService().getOrCreateTopic(persistenceTopic).get(); PersistentTopic persistentTopic = (PersistentTopic) t; - ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig(); - assertEquals(managedLedgerConfig.getEnsembleSize(), 3); - assertEquals(managedLedgerConfig.getWriteQuorumSize(), 3); - assertEquals(managedLedgerConfig.getAckQuorumSize(), 3); - assertEquals(managedLedgerConfig.getThrottleMarkDelete(), 0.1); + Awaitility.await().untilAsserted(() -> { + ManagedLedgerConfig managedLedgerConfig = persistentTopic.getManagedLedger().getConfig(); + assertEquals(managedLedgerConfig.getEnsembleSize(), 3); + assertEquals(managedLedgerConfig.getWriteQuorumSize(), 3); + assertEquals(managedLedgerConfig.getAckQuorumSize(), 3); + assertEquals(managedLedgerConfig.getThrottleMarkDelete(), 0.1); + }); PersistencePolicies getPersistencePolicies = admin.topics().getPersistence(persistenceTopic); log.info("PersistencePolicies: {} will set to the topic: {}", persistencePolicies, persistenceTopic); Assert.assertEquals(getPersistencePolicies, persistencePolicies); + consumer.close(); } @Test @@ -664,7 +670,7 @@ public void testRemovePersistence() throws Exception { Awaitility.await() .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - + admin.topics().createNonPartitionedTopic(persistenceTopic); admin.topics().setPersistence(persistenceTopic, persistencePolicies); Awaitility.await() @@ -732,12 +738,12 @@ public void testSetMaxProducers() throws Exception { Awaitility.await() .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); + admin.topics().createPartitionedTopic(persistenceTopic, 2); admin.topics().setMaxProducers(persistenceTopic, maxProducers); Awaitility.await() .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxProducers(persistenceTopic), maxProducers)); - admin.topics().createPartitionedTopic(persistenceTopic, 2); Producer producer1 = pulsarClient.newProducer().topic(persistenceTopic).create(); Producer producer2 = pulsarClient.newProducer().topic(persistenceTopic).create(); Producer producer3 = null; @@ -763,13 +769,12 @@ public void testRemoveMaxProducers() throws Exception { Awaitility.await() .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - + admin.topics().createPartitionedTopic(persistenceTopic, 2); admin.topics().setMaxProducers(persistenceTopic, maxProducers); Awaitility.await() .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxProducers(persistenceTopic), maxProducers)); - admin.topics().createPartitionedTopic(persistenceTopic, 2); Producer producer1 = pulsarClient.newProducer().topic(persistenceTopic).create(); Producer producer2 = pulsarClient.newProducer().topic(persistenceTopic).create(); Producer producer3 = null; @@ -1301,13 +1306,12 @@ public void testSetMaxConsumers() throws Exception { Awaitility.await() .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - + admin.topics().createPartitionedTopic(persistenceTopic, 2); admin.topics().setMaxConsumers(persistenceTopic, maxConsumers); Awaitility.await() .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumers(persistenceTopic), maxConsumers)); - admin.topics().createPartitionedTopic(persistenceTopic, 2); Consumer consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe(); Consumer consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe(); Consumer consumer3 = null; @@ -1333,13 +1337,12 @@ public void testRemoveMaxConsumers() throws Exception { Integer maxConsumers = 2; Awaitility.await() .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(testTopic))); - + admin.topics().createPartitionedTopic(persistenceTopic, 2); admin.topics().setMaxConsumers(persistenceTopic, maxConsumers); Awaitility.await() .untilAsserted(() -> Assert.assertEquals(admin.topics().getMaxConsumers(persistenceTopic), maxConsumers)); - admin.topics().createPartitionedTopic(persistenceTopic, 2); Consumer consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe(); Consumer consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe(); Consumer consumer3 = null; @@ -1688,8 +1691,8 @@ public void testPublishRateInDifferentLevelPolicy() throws Exception { public void testTopicMaxMessageSizeApi() throws Exception{ Awaitility.await() .until(() -> pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(persistenceTopic))); + admin.topics().createNonPartitionedTopic(persistenceTopic); assertNull(admin.topics().getMaxMessageSize(persistenceTopic)); - admin.topics().setMaxMessageSize(persistenceTopic,10); Awaitility.await().until(() -> pulsar.getTopicPoliciesService().getTopicPolicies(TopicName.get(persistenceTopic)) != null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java index aaa461839fcbe..d0a04b6cad927 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java @@ -204,6 +204,7 @@ public void testCacheCleanup() throws Exception { assertNotNull(listMap.get(topicName).get(0)); admin.topics().deletePartitionedTopic(topic, true); + admin.namespaces().unload(NAMESPACE1); assertNull(map.get(topicName)); assertNull(listMap.get(topicName)); }