Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix call sync method in async rest API for internalGetSubscriptionsForNonPartitionedTopic #13745

Merged
merged 12 commits into from Jan 18, 2022
Merged
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.bookie.rackawareness;

import com.google.api.client.util.Strings;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -35,6 +34,7 @@
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.metadata.api.MetadataCache;
Expand Down Expand Up @@ -161,7 +161,7 @@ private String getRack(String bookieAddress) {
}

if (bi != null
&& !Strings.isNullOrEmpty(bi.getRack())
&& !StringUtils.isEmpty(bi.getRack())
&& !bi.getRack().trim().equals("/")) {
String rack = bi.getRack();
if (!rack.startsWith("/")) {
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
Expand Down Expand Up @@ -749,7 +750,9 @@ private CompletableFuture<Void> provisionPartitionedTopicPath(AsyncResponse asyn

protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
if (throwable instanceof WebApplicationException) {
asyncResponse.resume((WebApplicationException) throwable);
asyncResponse.resume(throwable);
} else if (throwable instanceof BrokerServiceException.NotAllowedException) {
asyncResponse.resume(new RestException(Status.CONFLICT, throwable));
} else {
asyncResponse.resume(new RestException(throwable));
}
Expand Down
Expand Up @@ -1139,26 +1139,25 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscr
}

private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
try {
validateTopicOwnership(topicName, authoritative);
validateTopicOperation(topicName, TopicOperation.GET_SUBSCRIPTIONS);

Topic topic = getTopicReference(topicName);
final List<String> subscriptions = Lists.newArrayList();
topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName));
asyncResponse.resume(subscriptions);
} catch (WebApplicationException wae) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to get subscriptions for non-partitioned topic {},"
+ " redirecting to other brokers.",
clientAppId(), topicName, wae);
}
resumeAsyncResponseExceptionally(asyncResponse, wae);
return;
} catch (Exception e) {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e);
resumeAsyncResponseExceptionally(asyncResponse, e);
}
validateTopicOwnershipAsync(topicName, authoritative)
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS))
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys())))
.exceptionally(ex -> {
Throwable cause = ex.getCause();
if (cause instanceof WebApplicationException
&& ((WebApplicationException) cause).getResponse().getStatus()
== Status.TEMPORARY_REDIRECT.getStatusCode()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to get subscriptions for non-partitioned topic {},"
+ " redirecting to other brokers.", clientAppId(), topicName, cause);
}
} else {
log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, cause);
}
resumeAsyncResponseExceptionally(asyncResponse, cause);
return null;
});
}

protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog,
Expand Down Expand Up @@ -3763,13 +3762,9 @@ private Topic getTopicReference(TopicName topicName) {

private CompletableFuture<Topic> getTopicReferenceAsync(TopicName topicName) {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenCompose(optTopic -> {
if (optTopic.isPresent()) {
return CompletableFuture.completedFuture(optTopic.get());
} else {
return topicNotFoundReasonAsync(topicName);
}
});
.thenCompose(optTopic -> optTopic
.map(CompletableFuture::completedFuture)
.orElseGet(() -> topicNotFoundReasonAsync(topicName)));
}

private RestException topicNotFoundReason(TopicName topicName) {
Expand Down
Expand Up @@ -1050,22 +1050,42 @@ public void validateTopicOperation(TopicName topicName, TopicOperation operation
}

public void validateTopicOperation(TopicName topicName, TopicOperation operation, String subscription) {
try {
validateTopicOperationAsync(topicName, operation, subscription).get();
} catch (InterruptedException | ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof WebApplicationException){
throw (WebApplicationException) cause;
} else {
throw new RestException(cause);
}
}
}

public CompletableFuture<Void> validateTopicOperationAsync(TopicName topicName, TopicOperation operation) {
return validateTopicOperationAsync(topicName, operation, null);
}

public CompletableFuture<Void> validateTopicOperationAsync(TopicName topicName,
TopicOperation operation, String subscription) {
if (pulsar().getConfiguration().isAuthenticationEnabled()
&& pulsar().getBrokerService().isAuthorizationEnabled()) {
if (!isClientAuthenticated(clientAppId())) {
throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request");
}

AuthenticationDataHttps authData = clientAuthData();
authData.setSubscription(subscription);

Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService()
.allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), authData);

if (!isAuthorized) {
throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for"
+ " operation [%s] on topic [%s]", operation.toString(), topicName));
}
return pulsar().getBrokerService().getAuthorizationService()
.allowTopicOperationAsync(topicName, operation, originalPrincipal(), clientAppId(), authData)
.thenAccept(isAuthorized -> {
if (!isAuthorized) {
throw new RestException(Status.UNAUTHORIZED, String.format(
"Unauthorized to validateTopicOperation for operation [%s] on topic [%s]",
operation.toString(), topicName));
}
});
} else {
return CompletableFuture.completedFuture(null);
}
}

Expand Down