Skip to content

Commit

Permalink
Make PublishRateLimiter extend AutoCloseable
Browse files Browse the repository at this point in the history
- addresses review comment
  • Loading branch information
lhotari committed May 17, 2021
1 parent bfa4914 commit ac41481
Show file tree
Hide file tree
Showing 6 changed files with 15 additions and 6 deletions.
Expand Up @@ -25,7 +25,7 @@
import org.apache.pulsar.common.util.RateLimitFunction;
import org.apache.pulsar.common.util.RateLimiter;

public class PrecisPublishLimiter implements PublishRateLimiter, AutoCloseable {
public class PrecisPublishLimiter implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
Expand Down
Expand Up @@ -21,7 +21,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;

public interface PublishRateLimiter {
public interface PublishRateLimiter extends AutoCloseable {

PublishRateLimiter DISABLED_RATE_LIMITER = PublishRateLimiterDisable.DISABLED_RATE_LIMITER;

Expand Down
Expand Up @@ -62,4 +62,8 @@ public boolean tryAcquire(int numbers, long bytes) {
return false;
}

@Override
public void close() throws Exception {
// No-op
}
}
Expand Up @@ -108,4 +108,9 @@ public void update(PublishRate maxPublishRate) {
public boolean tryAcquire(int numbers, long bytes) {
return false;
}

@Override
public void close() throws Exception {
// no-op
}
}
Expand Up @@ -438,9 +438,9 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter instanceof AutoCloseable) {
if (topicPublishRateLimiter != null) {
try {
((AutoCloseable) topicPublishRateLimiter).close();
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
Expand Down
Expand Up @@ -1155,9 +1155,9 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter instanceof AutoCloseable) {
if (topicPublishRateLimiter != null) {
try {
((AutoCloseable) topicPublishRateLimiter).close();
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
Expand Down

0 comments on commit ac41481

Please sign in to comment.