diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 8790e0ad15708..7015c747d103a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.bookie.rackawareness; -import com.google.api.client.util.Strings; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; @@ -35,6 +34,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieAddressResolver; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; import org.apache.pulsar.metadata.api.MetadataCache; @@ -161,7 +161,7 @@ private String getRack(String bookieAddress) { } if (bi != null - && !Strings.isNullOrEmpty(bi.getRack()) + && !StringUtils.isEmpty(bi.getRack()) && !bi.getRack().trim().equals("/")) { String rack = bi.getRack(); if (!rack.startsWith("/")) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index f802858a47889..c29b4a4920611 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; @@ -749,7 +750,9 @@ private CompletableFuture provisionPartitionedTopicPath(AsyncResponse asyn protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) { if (throwable instanceof WebApplicationException) { - asyncResponse.resume((WebApplicationException) throwable); + asyncResponse.resume(throwable); + } else if (throwable instanceof BrokerServiceException.NotAllowedException) { + asyncResponse.resume(new RestException(Status.CONFLICT, throwable)); } else { asyncResponse.resume(new RestException(throwable)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 5e651cf892b3d..2f7a3390eac95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1139,26 +1139,25 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set subscr } private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { - try { - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.GET_SUBSCRIPTIONS); - - Topic topic = getTopicReference(topicName); - final List subscriptions = Lists.newArrayList(); - topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName)); - asyncResponse.resume(subscriptions); - } catch (WebApplicationException wae) { - if (log.isDebugEnabled()) { - log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}," - + " redirecting to other brokers.", - clientAppId(), topicName, wae); - } - resumeAsyncResponseExceptionally(asyncResponse, wae); - return; - } catch (Exception e) { - log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - } + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys()))) + .exceptionally(ex -> { + Throwable cause = ex.getCause(); + if (cause instanceof WebApplicationException + && ((WebApplicationException) cause).getResponse().getStatus() + == Status.TEMPORARY_REDIRECT.getStatusCode()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}," + + " redirecting to other brokers.", clientAppId(), topicName, cause); + } + } else { + log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, cause); + } + resumeAsyncResponseExceptionally(asyncResponse, cause); + return null; + }); } protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog, @@ -3763,13 +3762,9 @@ private Topic getTopicReference(TopicName topicName) { private CompletableFuture getTopicReferenceAsync(TopicName topicName) { return pulsar().getBrokerService().getTopicIfExists(topicName.toString()) - .thenCompose(optTopic -> { - if (optTopic.isPresent()) { - return CompletableFuture.completedFuture(optTopic.get()); - } else { - return topicNotFoundReasonAsync(topicName); - } - }); + .thenCompose(optTopic -> optTopic + .map(CompletableFuture::completedFuture) + .orElseGet(() -> topicNotFoundReasonAsync(topicName))); } private RestException topicNotFoundReason(TopicName topicName) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index b243a9931e6fd..1e1f8597482d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1050,22 +1050,42 @@ public void validateTopicOperation(TopicName topicName, TopicOperation operation } public void validateTopicOperation(TopicName topicName, TopicOperation operation, String subscription) { + try { + validateTopicOperationAsync(topicName, operation, subscription).get(); + } catch (InterruptedException | ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof WebApplicationException){ + throw (WebApplicationException) cause; + } else { + throw new RestException(cause); + } + } + } + + public CompletableFuture validateTopicOperationAsync(TopicName topicName, TopicOperation operation) { + return validateTopicOperationAsync(topicName, operation, null); + } + + public CompletableFuture validateTopicOperationAsync(TopicName topicName, + TopicOperation operation, String subscription) { if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) { if (!isClientAuthenticated(clientAppId())) { throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request"); } - AuthenticationDataHttps authData = clientAuthData(); authData.setSubscription(subscription); - - Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() - .allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), authData); - - if (!isAuthorized) { - throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for" - + " operation [%s] on topic [%s]", operation.toString(), topicName)); - } + return pulsar().getBrokerService().getAuthorizationService() + .allowTopicOperationAsync(topicName, operation, originalPrincipal(), clientAppId(), authData) + .thenAccept(isAuthorized -> { + if (!isAuthorized) { + throw new RestException(Status.UNAUTHORIZED, String.format( + "Unauthorized to validateTopicOperation for operation [%s] on topic [%s]", + operation.toString(), topicName)); + } + }); + } else { + return CompletableFuture.completedFuture(null); } }