Skip to content

Commit

Permalink
Fix call sync method in async rest API for ``internalGetSubscriptions…
Browse files Browse the repository at this point in the history
…ForNonPartitionedTopic`` (#13745)

(cherry picked from commit 0a046b9)
  • Loading branch information
mattisonchao authored and codelipenghui committed Jan 18, 2022
1 parent 8c86b3d commit f9aae1f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 37 deletions.
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
Expand Down Expand Up @@ -789,7 +790,9 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyn

protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
if (throwable instanceof WebApplicationException) {
asyncResponse.resume((WebApplicationException) throwable);
asyncResponse.resume(throwable);
} else if (throwable instanceof BrokerServiceException.NotAllowedException) {
asyncResponse.resume(new RestException(Status.CONFLICT, throwable));
} else {
asyncResponse.resume(new RestException(throwable));
}
Expand Down
Expand Up @@ -1106,26 +1106,25 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscr
}

private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
try {
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.GET_SUBSCRIPTIONS);

Topic topic = getTopicReference(topicName);
final List<String> subscriptions = Lists.newArrayList();
topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
asyncResponse.resume(subscriptions);
} catch (WebApplicationException wae) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to get subscriptions for non-partitioned topic {},"
+ " redirecting to other brokers.",
clientAppId(), topicName, wae);
}
resumeAsyncResponseExceptionally(asyncResponse, wae);
return;
} catch (Exception e) {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys())))
.exceptionally(ex -> {
Throwable cause = ex.getCause();
if (cause instanceof WebApplicationException
&& ((WebApplicationException) cause).getResponse().getStatus()
== Status.TEMPORARY_REDIRECT.getStatusCode()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to get subscriptions for non-partitioned topic {},"
+ " redirecting to other brokers.", clientAppId(), topicName, cause);
}
} else {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, cause);
}
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}

protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog,
Expand Down Expand Up @@ -3537,13 +3536,9 @@ private Topic getTopicReference(TopicName topicName) {

private CompletableFuture<Topic> getTopicReferenceAsync(TopicName topicName) {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenCompose(optTopic -> {
if (optTopic.isPresent()) {
return CompletableFuture.completedFuture(optTopic.get());
} else {
return topicNotFoundReasonAsync(topicName);
}
});
.thenCompose(optTopic -> optTopic
.map(CompletableFuture::completedFuture)
.orElseGet(() -> topicNotFoundReasonAsync(topicName)));
}

private RestException topicNotFoundReason(TopicName topicName) {
Expand Down
Expand Up @@ -1080,22 +1080,42 @@ public void validateTopicOperation(TopicName topicName, TopicOperation operation
}

public void validateTopicOperation(TopicName topicName, TopicOperation operation, String subscription) {
try {
validateTopicOperationAsync(topicName, operation, subscription).get();
} catch (InterruptedException | ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WebApplicationException){
throw (WebApplicationException) cause;
} else {
throw new RestException(cause);
}
}
}

public CompletableFuture<Void> validateTopicOperationAsync(TopicName topicName, TopicOperation operation) {
return validateTopicOperationAsync(topicName, operation, null);
}

public CompletableFuture<Void> validateTopicOperationAsync(TopicName topicName,
TopicOperation operation, String subscription) {
if (pulsar().getConfiguration().isAuthenticationEnabled()
&& pulsar().getBrokerService().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId())) {
throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
}

AuthenticationDataHttps authData = clientAuthData();
authData.setSubscription(subscription);

Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
.allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), authData);

if (!isAuthorized) {
throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for"
+ " operation [%s] on topic [%s]", operation.toString(), topicName));
}
return pulsar().getBrokerService().getAuthorizationService()
.allowTopicOperationAsync(topicName, operation, originalPrincipal(), clientAppId(), authData)
.thenAccept(isAuthorized -> {
if (!isAuthorized) {
throw new RestException(Status.UNAUTHORIZED, String.format(
"Unauthorized to validateTopicOperation for operation [%s] on topic [%s]",
operation.toString(), topicName));
}
});
} else {
return CompletableFuture.completedFuture(null);
}
}

Expand Down

0 comments on commit f9aae1f

Please sign in to comment.