From 7593ad545730c2eb93c814049d6ea04b42f5df2f Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Thu, 13 Jan 2022 23:03:50 +0800 Subject: [PATCH 01/12] Fix async method call sync method. --- .../pulsar/broker/admin/AdminResource.java | 2 +- .../admin/impl/PersistentTopicsBase.java | 41 ++++++++++--------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index f802858a47889..f63abf1ae67f2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -749,7 +749,7 @@ private CompletableFuture provisionPartitionedTopicPath(AsyncResponse asyn protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) { if (throwable instanceof WebApplicationException) { - asyncResponse.resume((WebApplicationException) throwable); + asyncResponse.resume(throwable); } else { asyncResponse.resume(new RestException(throwable)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 5e651cf892b3d..7c40a422bdfe6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1139,26 +1139,27 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set subscr } private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { - try { - validateTopicOwnership(topicName, authoritative); - validateTopicOperation(topicName, TopicOperation.GET_SUBSCRIPTIONS); - - Topic topic = getTopicReference(topicName); - final List subscriptions = Lists.newArrayList(); - topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName)); - asyncResponse.resume(subscriptions); - } catch (WebApplicationException wae) { - if (log.isDebugEnabled()) { - log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}," - + " redirecting to other brokers.", - clientAppId(), topicName, wae); - } - resumeAsyncResponseExceptionally(asyncResponse, wae); - return; - } catch (Exception e) { - log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e); - resumeAsyncResponseExceptionally(asyncResponse, e); - } + validateTopicOwnershipAsync(topicName, authoritative) + .thenRun(() -> validateTopicOperation(topicName, TopicOperation.GET_SUBSCRIPTIONS)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenAccept(topic -> { + final List subscriptions = new ArrayList<>(topic.getSubscriptions().keys()); + asyncResponse.resume(subscriptions); + }).exceptionally(ex -> { + Throwable cause = ex.getCause(); + if (ex instanceof WebApplicationException) { + if (log.isDebugEnabled() && ((WebApplicationException) cause).getResponse().getStatus() + == Status.TEMPORARY_REDIRECT.getStatusCode()) { + log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}," + + " redirecting to other brokers.", + clientAppId(), topicName, ex); + } + } else { + log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, ex); + } + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } protected TopicStats internalGetStats(boolean authoritative, boolean getPreciseBacklog, From 4e42b14f4febcc6f0c4b54f122d5c61f2f019809 Mon Sep 17 00:00:00 2001 From: Qiang Zhao <74767115+mattisonchao@users.noreply.github.com> Date: Fri, 14 Jan 2022 07:07:25 +0800 Subject: [PATCH 02/12] Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java Co-authored-by: lipenghui --- .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 7c40a422bdfe6..4cb5dc900afd0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1140,8 +1140,10 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set subscr private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { validateTopicOwnershipAsync(topicName, authoritative) - .thenRun(() -> validateTopicOperation(topicName, TopicOperation.GET_SUBSCRIPTIONS)) - .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenCompose(__ -> { + validateTopicOperation(topicName, TopicOperation.GET_SUBSCRIPTIONS); + return getTopicReferenceAsync(topicName); + }) .thenAccept(topic -> { final List subscriptions = new ArrayList<>(topic.getSubscriptions().keys()); asyncResponse.resume(subscriptions); From 207c88e32f5f323c5a643ff9efc630521c51770c Mon Sep 17 00:00:00 2001 From: Qiang Zhao <74767115+mattisonchao@users.noreply.github.com> Date: Fri, 14 Jan 2022 07:07:35 +0800 Subject: [PATCH 03/12] Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java Co-authored-by: lipenghui --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4cb5dc900afd0..0042fecdbcf21 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1154,7 +1154,7 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR == Status.TEMPORARY_REDIRECT.getStatusCode()) { log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}," + " redirecting to other brokers.", - clientAppId(), topicName, ex); + clientAppId(), topicName, cause); } } else { log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, ex); From 4e8850890c41c325deb5177184d617e7ebbae105 Mon Sep 17 00:00:00 2001 From: Qiang Zhao <74767115+mattisonchao@users.noreply.github.com> Date: Fri, 14 Jan 2022 07:07:40 +0800 Subject: [PATCH 04/12] Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java Co-authored-by: lipenghui --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 0042fecdbcf21..7c78b7dcbe09c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1157,7 +1157,7 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR clientAppId(), topicName, cause); } } else { - log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, ex); + log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, cause); } resumeAsyncResponseExceptionally(asyncResponse, ex); return null; From 2c8eaef88c61bea36483de0a0f7adf0792b9b721 Mon Sep 17 00:00:00 2001 From: Qiang Zhao <74767115+mattisonchao@users.noreply.github.com> Date: Fri, 14 Jan 2022 07:07:48 +0800 Subject: [PATCH 05/12] Update pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java Co-authored-by: lipenghui --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 7c78b7dcbe09c..cfc264b054973 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1159,7 +1159,7 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR } else { log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, cause); } - resumeAsyncResponseExceptionally(asyncResponse, ex); + resumeAsyncResponseExceptionally(asyncResponse, cause); return null; }); } From 63cde59fc6f84c9b92de57115f46b50fcd9982c5 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Fri, 14 Jan 2022 07:10:14 +0800 Subject: [PATCH 06/12] fix wrong instanceof judgement. --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index cfc264b054973..b70a85737e93d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1149,7 +1149,7 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR asyncResponse.resume(subscriptions); }).exceptionally(ex -> { Throwable cause = ex.getCause(); - if (ex instanceof WebApplicationException) { + if (cause instanceof WebApplicationException) { if (log.isDebugEnabled() && ((WebApplicationException) cause).getResponse().getStatus() == Status.TEMPORARY_REDIRECT.getStatusCode()) { log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}," From ee5992582a0997be06e2119c84e59ad4c4a905e3 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Fri, 14 Jan 2022 11:09:48 +0800 Subject: [PATCH 07/12] Print error log when WebApplicationException is not TEMPORARY_REDIRECT --- .../admin/impl/PersistentTopicsBase.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index b70a85737e93d..4fa966a748aa6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1139,23 +1139,21 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set subscr } private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { - validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> { + validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> { validateTopicOperation(topicName, TopicOperation.GET_SUBSCRIPTIONS); return getTopicReferenceAsync(topicName); - }) - .thenAccept(topic -> { + }).thenAccept(topic -> { final List subscriptions = new ArrayList<>(topic.getSubscriptions().keys()); asyncResponse.resume(subscriptions); }).exceptionally(ex -> { Throwable cause = ex.getCause(); - if (cause instanceof WebApplicationException) { - if (log.isDebugEnabled() && ((WebApplicationException) cause).getResponse().getStatus() - == Status.TEMPORARY_REDIRECT.getStatusCode()) { - log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}," - + " redirecting to other brokers.", - clientAppId(), topicName, cause); - } + if (cause instanceof WebApplicationException + && ((WebApplicationException) cause).getResponse().getStatus() + == Status.TEMPORARY_REDIRECT.getStatusCode()) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}," + + " redirecting to other brokers.", clientAppId(), topicName, cause); + } } else { log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, cause); } From 16a91cab3fe0cea727f69ed67858ced0fc0cbe7c Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sat, 15 Jan 2022 12:57:29 +0800 Subject: [PATCH 08/12] Add validateTopicOperationAsync method. --- .../admin/impl/PersistentTopicsBase.java | 12 +++---- .../pulsar/broker/web/PulsarWebResource.java | 34 ++++++++++++++----- 2 files changed, 30 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 4fa966a748aa6..1b97d8a1a5814 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1139,13 +1139,11 @@ private void resumeAsyncResponse(AsyncResponse asyncResponse, Set subscr } private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) { - validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> { - validateTopicOperation(topicName, TopicOperation.GET_SUBSCRIPTIONS); - return getTopicReferenceAsync(topicName); - }).thenAccept(topic -> { - final List subscriptions = new ArrayList<>(topic.getSubscriptions().keys()); - asyncResponse.resume(subscriptions); - }).exceptionally(ex -> { + validateTopicOwnershipAsync(topicName, authoritative) + .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS)) + .thenCompose(__ -> getTopicReferenceAsync(topicName)) + .thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys()))) + .exceptionally(ex -> { Throwable cause = ex.getCause(); if (cause instanceof WebApplicationException && ((WebApplicationException) cause).getResponse().getStatus() diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index b243a9931e6fd..e6026b6abd286 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1050,22 +1050,38 @@ public void validateTopicOperation(TopicName topicName, TopicOperation operation } public void validateTopicOperation(TopicName topicName, TopicOperation operation, String subscription) { + try { + validateTopicOperationAsync(topicName, operation, subscription).get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Validate Topic [%s] operation [%s] got server internal error.", e); + throw new RestException(e); + } + } + + public CompletableFuture validateTopicOperationAsync(TopicName topicName, TopicOperation operation) { + return validateTopicOperationAsync(topicName, operation, null); + } + + public CompletableFuture validateTopicOperationAsync(TopicName topicName, + TopicOperation operation, String subscription) { if (pulsar().getConfiguration().isAuthenticationEnabled() && pulsar().getBrokerService().isAuthorizationEnabled()) { if (!isClientAuthenticated(clientAppId())) { throw new RestException(Status.UNAUTHORIZED, "Need to authenticate to perform the request"); } - AuthenticationDataHttps authData = clientAuthData(); authData.setSubscription(subscription); - - Boolean isAuthorized = pulsar().getBrokerService().getAuthorizationService() - .allowTopicOperation(topicName, operation, originalPrincipal(), clientAppId(), authData); - - if (!isAuthorized) { - throw new RestException(Status.UNAUTHORIZED, String.format("Unauthorized to validateTopicOperation for" - + " operation [%s] on topic [%s]", operation.toString(), topicName)); - } + return pulsar().getBrokerService().getAuthorizationService() + .allowTopicOperationAsync(topicName, operation, originalPrincipal(), clientAppId(), authData) + .thenAccept(isAuthorized -> { + if (!isAuthorized) { + throw new RestException(Status.UNAUTHORIZED, String.format( + "Unauthorized to validateTopicOperation for operation [%s] on topic [%s]", + operation.toString(), topicName)); + } + }); + } else { + return CompletableFuture.completedFuture(null); } } From 6647951541f09671e66f0205c60010ba4002f821 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sat, 15 Jan 2022 15:13:28 +0800 Subject: [PATCH 09/12] Fixes unhanlde restexception --- .../org/apache/pulsar/broker/web/PulsarWebResource.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index e6026b6abd286..1a9aafbc95f3d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1053,8 +1053,13 @@ public void validateTopicOperation(TopicName topicName, TopicOperation operation try { validateTopicOperationAsync(topicName, operation, subscription).get(); } catch (InterruptedException | ExecutionException e) { - log.error("Validate Topic [%s] operation [%s] got server internal error.", e); - throw new RestException(e); + Throwable cause = e.getCause(); + log.error("Validate Topic [%s] operation [%s] got server internal error.", cause); + if (cause instanceof WebApplicationException){ + throw (WebApplicationException) cause; + } else { + throw new RestException(cause); + } } } From 2b5df1bc16041fd38c5b40feda992644e245d910 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Sun, 16 Jan 2022 19:04:40 +0800 Subject: [PATCH 10/12] Fixes async method effect test issue. - Due to async method do not hanlde ``NOT_ALLOWED_EXCEPTION``. --- .../org/apache/pulsar/broker/admin/AdminResource.java | 3 +++ .../pulsar/broker/admin/impl/PersistentTopicsBase.java | 10 +++------- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index f63abf1ae67f2..c29b4a4920611 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.systopic.SystemTopicClient; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; @@ -750,6 +751,8 @@ private CompletableFuture provisionPartitionedTopicPath(AsyncResponse asyn protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) { if (throwable instanceof WebApplicationException) { asyncResponse.resume(throwable); + } else if (throwable instanceof BrokerServiceException.NotAllowedException) { + asyncResponse.resume(new RestException(Status.CONFLICT, throwable)); } else { asyncResponse.resume(new RestException(throwable)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 1b97d8a1a5814..2f7a3390eac95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -3762,13 +3762,9 @@ private Topic getTopicReference(TopicName topicName) { private CompletableFuture getTopicReferenceAsync(TopicName topicName) { return pulsar().getBrokerService().getTopicIfExists(topicName.toString()) - .thenCompose(optTopic -> { - if (optTopic.isPresent()) { - return CompletableFuture.completedFuture(optTopic.get()); - } else { - return topicNotFoundReasonAsync(topicName); - } - }); + .thenCompose(optTopic -> optTopic + .map(CompletableFuture::completedFuture) + .orElseGet(() -> topicNotFoundReasonAsync(topicName))); } private RestException topicNotFoundReason(TopicName topicName) { From 0d7b555dcb25f1cdce2319b8772a5c5c501a63a1 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Mon, 17 Jan 2022 12:02:32 +0800 Subject: [PATCH 11/12] Fix checkstyle --- .../bookie/rackawareness/BookieRackAffinityMapping.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java index 8790e0ad15708..7015c747d103a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.bookie.rackawareness; -import com.google.api.client.util.Strings; import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; @@ -35,6 +34,7 @@ import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieAddressResolver; import org.apache.commons.configuration.Configuration; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.policies.data.BookieInfo; import org.apache.pulsar.common.policies.data.BookiesRackConfiguration; import org.apache.pulsar.metadata.api.MetadataCache; @@ -161,7 +161,7 @@ private String getRack(String bookieAddress) { } if (bi != null - && !Strings.isNullOrEmpty(bi.getRack()) + && !StringUtils.isEmpty(bi.getRack()) && !bi.getRack().trim().equals("/")) { String rack = bi.getRack(); if (!rack.startsWith("/")) { From 841c3effcbef6c94bd3222e88261fcc614d7d14a Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Mon, 17 Jan 2022 23:11:08 +0800 Subject: [PATCH 12/12] Delete useless log --- .../java/org/apache/pulsar/broker/web/PulsarWebResource.java | 1 - 1 file changed, 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index 1a9aafbc95f3d..1e1f8597482d3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -1054,7 +1054,6 @@ public void validateTopicOperation(TopicName topicName, TopicOperation operation validateTopicOperationAsync(topicName, operation, subscription).get(); } catch (InterruptedException | ExecutionException e) { Throwable cause = e.getCause(); - log.error("Validate Topic [%s] operation [%s] got server internal error.", cause); if (cause instanceof WebApplicationException){ throw (WebApplicationException) cause; } else {