Skip to content

Commit

Permalink
[cleanup][broker] Override close method to avoid caching exception (#…
Browse files Browse the repository at this point in the history
…15529)

(cherry picked from commit 526979a)
  • Loading branch information
Technoboy- authored and mattisonchao committed May 25, 2022
1 parent d3796e2 commit 7856268
Show file tree
Hide file tree
Showing 7 changed files with 11 additions and 16 deletions.
Expand Up @@ -834,9 +834,7 @@ private void updatePublishDispatcher(Optional<Policies> optPolicies) {
}

// attach the resource-group level rate limiters, if set
String rgName = policies != null && policies.resource_group_name != null
? policies.resource_group_name
: null;
String rgName = policies.resource_group_name;
if (rgName != null) {
final ResourceGroup resourceGroup =
brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName);
Expand Down
Expand Up @@ -133,7 +133,7 @@ public boolean tryAcquire(int numbers, long bytes) {
}

@Override
public void close() throws Exception {
public void close() {
rateLimitFunction.apply();
replaceLimiters(null);
}
Expand Down
Expand Up @@ -71,4 +71,9 @@ public interface PublishRateLimiter extends AutoCloseable {
* @param bytes
*/
boolean tryAcquire(int numbers, long bytes);

/**
* Close the limiter.
*/
void close();
}
Expand Up @@ -63,7 +63,7 @@ public boolean tryAcquire(int numbers, long bytes) {
}

@Override
public void close() throws Exception {
public void close() {
// No-op
}
}
Expand Up @@ -110,7 +110,7 @@ public boolean tryAcquire(int numbers, long bytes) {
}

@Override
public void close() throws Exception {
public void close() {
// no-op
}
}
Expand Up @@ -462,11 +462,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
topicPublishRateLimiter.close();
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
Expand Down
Expand Up @@ -1270,11 +1270,7 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
topicPublishRateLimiter.close();
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
Expand Down

0 comments on commit 7856268

Please sign in to comment.