Skip to content

Commit

Permalink
Add authoritative flag for topic policy to avoid redirect loop (#11051)
Browse files Browse the repository at this point in the history
* Add authoritative flag for topic policy to avoid redirect loop

1. Add authoritative flag for topic policy to avoid redirect loop
2. Prevent set topic policy on a non-existing topic
3. Prevent set topic policy on a partition of a partitioned topic
4. Redirect to the broker which is owner of the partition-0 for a partitioned topic when setting topic policy
5. Don't remove policy cache when the topic removed from the broker,
   this will lead to the topic come back, but can't find the topic policy,
   since the namespace does not removed from the broker, we will not read from
   the system topic again. For this case we already handled when the broker does not
   provide service for that namespace, the topic policy cache under the namespace will be removed.

(cherry picked from commit 0b67438)
  • Loading branch information
codelipenghui committed Jul 7, 2021
1 parent 9e38f07 commit be68a26
Show file tree
Hide file tree
Showing 6 changed files with 1,057 additions and 950 deletions.
Expand Up @@ -499,31 +499,17 @@ protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
});
}

protected void internalSetDelayedDeliveryPolicies(AsyncResponse asyncResponse,
DelayedDeliveryPolicies deliveryPolicies) {
TopicPolicies topicPolicies = null;
protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies) {
TopicPolicies topicPolicies;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies have not been initialized yet.", topicName);
asyncResponse.resume(new RestException(e));
return;
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
topicPolicies.setDelayedDeliveryTickTimeMillis(
deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}
topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
topicPolicies.setDelayedDeliveryTickTimeMillis(
deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed set delayed delivery policy for topic", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
Expand Down Expand Up @@ -775,15 +761,21 @@ protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryP
}

protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(boolean applied) {
OffloadPoliciesImpl offloadPolicies =
getTopicPolicies(topicName).map(TopicPolicies::getOffloadPolicies).orElse(null);
if (applied) {
OffloadPoliciesImpl namespacePolicy =
(OffloadPoliciesImpl) getNamespacePolicies(namespaceName).offload_policies;
offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies
, namespacePolicy, pulsar().getConfiguration().getProperties());
CompletableFuture<OffloadPoliciesImpl> res = new CompletableFuture<>();
try {
OffloadPoliciesImpl offloadPolicies =
getTopicPolicies(topicName).map(TopicPolicies::getOffloadPolicies).orElse(null);
if (applied) {
OffloadPoliciesImpl namespacePolicy =
(OffloadPoliciesImpl) getNamespacePolicies(namespaceName).offload_policies;
offloadPolicies = OffloadPoliciesImpl.mergeConfiguration(offloadPolicies
, namespacePolicy, pulsar().getConfiguration().getProperties());
}
res.complete(offloadPolicies);
} catch (Exception e) {
res.completeExceptionally(e);
}
return CompletableFuture.completedFuture(offloadPolicies);
return res;
}

protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
Expand Down Expand Up @@ -2555,35 +2547,28 @@ protected Map<BacklogQuota.BacklogQuotaType, BacklogQuota> internalGetBacklogQuo
return quotaMap;
}

protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
BacklogQuota.BacklogQuotaType backlogQuotaType,
protected CompletableFuture<Void> internalSetBacklogQuota(BacklogQuota.BacklogQuotaType backlogQuotaType,
BacklogQuotaImpl backlogQuota) {
validateTopicPolicyOperation(topicName, PolicyName.BACKLOG, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
TopicPolicies topicPolicies;
if (backlogQuotaType == null) {
backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
}
TopicPolicies topicPolicies;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies have not been initialized yet.", topicName);
asyncResponse.resume(new RestException(e));
return;
}
if (topicPolicies == null){
topicPolicies = new TopicPolicies();
topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}

RetentionPolicies retentionPolicies = getRetentionPolicies(topicName, topicPolicies);
if (!checkBacklogQuota(backlogQuota, retentionPolicies)) {
log.warn(
"[{}] Failed to update backlog configuration for topic {}: conflicts with retention quota",
clientAppId(), topicName);
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Backlog Quota exceeds configured retention quota for topic. "
+ "Please increase retention quota and retry"));
return;
}

if (backlogQuota != null) {
Expand All @@ -2592,22 +2577,15 @@ protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
topicPolicies.getBackLogQuotaMap().remove(backlogQuotaType.name());
}
Map<String, BacklogQuotaImpl> backLogQuotaMap = topicPolicies.getBackLogQuotaMap();
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed updated backlog quota map", ex);
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
jsonMapper().writeValueAsString(backLogQuotaMap));
} catch (JsonProcessingException ignore) { }
asyncResponse.resume(Response.noContent().build());
}
});
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> {
try {
log.info("[{}] Successfully updated backlog quota map: namespace={}, topic={}, map={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
jsonMapper().writeValueAsString(backLogQuotaMap));
} catch (JsonProcessingException ignore) { }
});
}

protected CompletableFuture<Boolean> internalGetDeduplication(boolean applied) {
Expand Down Expand Up @@ -2638,38 +2616,26 @@ protected CompletableFuture<Void> internalSetDeduplication(Boolean enabled) {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) {
protected CompletableFuture<Void> internalSetMessageTTL(Integer ttlInSecond) {
//Validate message ttl value.
if (ttlInSecond != null && ttlInSecond < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL");
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Invalid value for message TTL"));
}
TopicPolicies topicPolicies;
//Update existing topic policy or create a new one if not exist.
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies have not been initialized yet.", topicName);
asyncResponse.resume(new RestException(e));
return;
}
if (topicPolicies == null){
topicPolicies = new TopicPolicies();
topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}
topicPolicies.setMessageTTLInSeconds(ttlInSecond);
pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed set message ttl for topic", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
ttlInSecond);
asyncResponse.resume(Response.noContent().build());
}
});
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies).thenRun(() -> {
log.info("[{}] Successfully set topic message ttl: namespace={}, topic={}, ttl={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
ttlInSecond);
});
}

private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicies topicPolicies) {
Expand All @@ -2686,13 +2652,8 @@ private RetentionPolicies getRetentionPolicies(TopicName topicName, TopicPolicie
return retentionPolicies;
}

protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
BacklogQuota.BacklogQuotaType backlogQuotaType) {
internalSetBacklogQuota(asyncResponse, backlogQuotaType, null);
}

protected void internalGetRetention(AsyncResponse asyncResponse, boolean applied){
RetentionPolicies retentionPolicies = getTopicPolicies(topicName)
protected RetentionPolicies internalGetRetention(boolean applied) {
return getTopicPolicies(topicName)
.map(TopicPolicies::getRetentionPolicies).orElseGet(() -> {
if (applied) {
RetentionPolicies policies = getNamespacePolicies(namespaceName).retention_policies;
Expand All @@ -2702,15 +2663,19 @@ protected void internalGetRetention(AsyncResponse asyncResponse, boolean applied
}
return null;
});
asyncResponse.resume(retentionPolicies == null ? Response.noContent().build() : retentionPolicies);
}

protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
if (retention == null) {
return CompletableFuture.completedFuture(null);
}
TopicPolicies topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
TopicPolicies topicPolicies;
try {
topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}
BacklogQuota backlogQuota =
topicPolicies.getBackLogQuotaMap().get(BacklogQuota.BacklogQuotaType.destination_storage.name());
if (backlogQuota == null) {
Expand All @@ -2721,9 +2686,9 @@ protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retenti
log.warn(
"[{}] Failed to update retention quota configuration for topic {}: conflicts with retention quota",
clientAppId(), topicName);
throw new RestException(Status.PRECONDITION_FAILED,
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Retention Quota must exceed configured backlog quota for topic. "
+ "Please increase retention quota and retry");
+ "Please increase retention quota and retry"));
}
topicPolicies.setRetentionPolicies(retention);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
Expand Down Expand Up @@ -2849,12 +2814,30 @@ protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRate
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected void preValidation() {
protected CompletableFuture<Void> preValidation(boolean authoritative) {
checkTopicLevelPolicyEnable();
if (topicName.isPartitioned()) {
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED,
"Not allowed to set/get topic policy for a partition"));
}
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validateTopicOwnership(topicName, false);
return checkTopicExistsAsync(topicName).thenCompose(exist -> {
if (!exist) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
} else {
return getPartitionedTopicMetadataAsync(topicName, false, false)
.thenCompose(metadata -> {
if (metadata.partitions > 0) {
return validateTopicOwnershipAsync(TopicName.get(topicName.toString()
+ TopicName.PARTITIONED_TOPIC_SUFFIX + 0), authoritative);
} else {
return validateTopicOwnershipAsync(topicName, authoritative);
}
});
}
});
}

protected CompletableFuture<Void> internalRemoveMaxProducers() {
Expand Down Expand Up @@ -3970,6 +3953,16 @@ protected void internalHandleResult(AsyncResponse asyncResponse,
}
}

protected void handleTopicPolicyException(String methodName, Throwable thr, AsyncResponse asyncResponse) {
Throwable cause = thr.getCause();
if (!(cause instanceof WebApplicationException)
|| !(((WebApplicationException) cause).getResponse().getStatus() == 307)) {
log.error("[{}] Failed to perform {} on topic {}",
clientAppId(), methodName, topicName, cause);
}
resumeAsyncResponseExceptionally(asyncResponse, cause);
}

protected void internalTruncateNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
Topic topic;
try {
Expand Down

0 comments on commit be68a26

Please sign in to comment.