diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index ba619533e5d16..6e601af90fd13 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -40,6 +40,7 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.internal.TopicsImpl; @@ -789,7 +790,9 @@ private CompletableFuture provisionPartitionedTopicPath(AsyncResponse asyn protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) { if (throwable instanceof WebApplicationException) { - asyncResponse.resume((WebApplicationException) throwable); + asyncResponse.resume(throwable); + } else if (throwable instanceof BrokerServiceException.NotAllowedException) { + asyncResponse.resume(new RestException(Status.CONFLICT, throwable)); } else { asyncResponse.resume(new RestException(throwable)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 1c8aa8a6072e6..25e8ac1c5c06c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1106,26 +1106,25 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set subscr } private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { - try { - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.GET_SUBSCRIPTIONS); - - Topic topic = getTopicReference(topicName); - final List subscriptions = Lists.newArrayList(); - topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName)); - asyncResponse.resume(subscriptions); - } catch (WebApplicationException wae) { - if (log.isDebugEnabled()) { - log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}," - + " redirecting to other brokers.", - clientAppId(), topicName, wae); - } - resumeAsyncResponseExceptionally(asyncResponse, wae); - return; - } catch (Exception e) { - log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - } + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys()))) + .exceptionally(ex -> { + Throwable cause = ex.getCause(); + if (cause instanceof WebApplicationException + && ((WebApplicationException) cause).getResponse().getStatus() + == Status.TEMPORARY_REDIRECT.getStatusCode()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}," + + " redirecting to other brokers.", clientAppId(), topicName, cause); + } + } else { + log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, cause); + } + resumeAsyncResponseExceptionally(asyncResponse, cause); + return null; + }); } protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog, @@ -3537,13 +3536,9 @@ private Topic getTopicReference(TopicName topicName) { private CompletableFuture getTopicReferenceAsync(TopicName topicName) { return pulsar().getBrokerService().getTopicIfExists(topicName.toString()) - .thenCompose(optTopic -> { - if (optTopic.isPresent()) { - return CompletableFuture.completedFuture(optTopic.get()); - } else { - return topicNotFoundReasonAsync(topicName); - } - }); + .thenCompose(optTopic -> optTopic + .map(CompletableFuture::completedFuture) + .orElseGet(() -> topicNotFoundReasonAsync(topicName))); } private RestException topicNotFoundReason(TopicName topicName) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index d44a812ecaafe..ac2e4da8e6dde 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1080,22 +1080,42 @@ public void validateTopicOperation(TopicName topicName, TopicOperation operation } public void validateTopicOperation(TopicName topicName, TopicOperation operation, String subscription) { + try { + validateTopicOperationAsync(topicName, operation, subscription).get(); + } catch (InterruptedException | ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof WebApplicationException){ + throw (WebApplicationException) cause; + } else { + throw new RestException(cause); + } + } + } + + public CompletableFuture validateTopicOperationAsync(TopicName topicName, TopicOperation operation) { + return validateTopicOperationAsync(topicName, operation, null); + } + + public CompletableFuture validateTopicOperationAsync(TopicName topicName, + TopicOperation operation, String subscription) { if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) { if (!isClientAuthenticated(clientAppId())) { throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request"); } - AuthenticationDataHttps authData = clientAuthData(); authData.setSubscription(subscription); - - Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() - .allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), authData); - - if (!isAuthorized) { - throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for" - + " operation [%s] on topic [%s]", operation.toString(), topicName)); - } + return pulsar().getBrokerService().getAuthorizationService() + .allowTopicOperationAsync(topicName, operation, originalPrincipal(), clientAppId(), authData) + .thenAccept(isAuthorized -> { + if (!isAuthorized) { + throw new RestException(Status.UNAUTHORIZED, String.format( + "Unauthorized to validateTopicOperation for operation [%s] on topic [%s]", + operation.toString(), topicName)); + } + }); + } else { + return CompletableFuture.completedFuture(null); } }