From fd3ccd5f74412a55cf644127fe3e24fb7bac78ac Mon Sep 17 00:00:00 2001 From: technoboy Date: Tue, 10 May 2022 21:47:02 +0800 Subject: [PATCH] Override close method to avoid throwing exception. --- .../org/apache/pulsar/broker/service/AbstractTopic.java | 4 +--- .../apache/pulsar/broker/service/PrecisPublishLimiter.java | 2 +- .../apache/pulsar/broker/service/PublishRateLimiter.java | 5 +++++ .../pulsar/broker/service/PublishRateLimiterDisable.java | 2 +- .../pulsar/broker/service/PublishRateLimiterImpl.java | 2 +- .../broker/service/nonpersistent/NonPersistentTopic.java | 6 +----- .../pulsar/broker/service/persistent/PersistentTopic.java | 6 +----- 7 files changed, 11 insertions(+), 16 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 2c0d884100841..448cc6653a3bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -1051,9 +1051,7 @@ public void updateResourceGroupLimiter(Optional optPolicies) { } // attach the resource-group level rate limiters, if set - String rgName = policies.resource_group_name != null - ? policies.resource_group_name - : null; + String rgName = policies.resource_group_name; if (rgName != null) { final ResourceGroup resourceGroup = brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index e61597e2d139f..67cc46d95fada 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -133,7 +133,7 @@ public boolean tryAcquire(int numbers, long bytes) { } @Override - public void close() throws Exception { + public void close() { rateLimitFunction.apply(); replaceLimiters(null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java index 397887978b2b5..931f35cfa1bd4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java @@ -71,4 +71,9 @@ public interface PublishRateLimiter extends AutoCloseable { * @param bytes */ boolean tryAcquire(int numbers, long bytes); + + /** + * Close the limiter. + */ + void close(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java index 81c4b82317f83..72c8132128e19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -63,7 +63,7 @@ public boolean tryAcquire(int numbers, long bytes) { } @Override - public void close() throws Exception { + public void close() { // No-op } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java index 0e1200edc31cb..f1646684b82cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -110,7 +110,7 @@ public boolean tryAcquire(int numbers, long bytes) { } @Override - public void close() throws Exception { + public void close() { // no-op } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index b7422bca96275..045ad47968db0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -491,11 +491,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); if (topicPublishRateLimiter != null) { - try { - topicPublishRateLimiter.close(); - } catch (Exception e) { - log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); - } + topicPublishRateLimiter.close(); } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); if (this.resourceGroupPublishLimiter != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 27e3f1b034530..73ff5e292ae44 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1281,11 +1281,7 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); if (topicPublishRateLimiter != null) { - try { - topicPublishRateLimiter.close(); - } catch (Exception e) { - log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); - } + topicPublishRateLimiter.close(); } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); if (this.resourceGroupPublishLimiter != null) {