diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 54f9e8d189de8..39b0650c2cb5b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -883,7 +883,7 @@ protected void updatePublishDispatcher(PublishRate publishRate) { // create new rateLimiter if rate-limiter is disabled if (preciseTopicPublishRateLimitingEnable) { this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate, - () -> this.enableCnxAutoRead()); + () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor()); } else { this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index 60fbcf029e820..e61597e2d139f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -18,7 +18,7 @@ */ package org.apache.pulsar.broker.service; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ScheduledExecutorService; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.util.RateLimitFunction; @@ -27,30 +27,37 @@ public class PrecisPublishLimiter implements PublishRateLimiter { protected volatile int publishMaxMessageRate = 0; protected volatile long publishMaxByteRate = 0; - protected volatile boolean publishThrottlingEnabled = false; // precise mode for publish rate limiter - private RateLimiter topicPublishRateLimiterOnMessage; - private RateLimiter topicPublishRateLimiterOnByte; + private volatile RateLimiter topicPublishRateLimiterOnMessage; + private volatile RateLimiter topicPublishRateLimiterOnByte; private final RateLimitFunction rateLimitFunction; + private final ScheduledExecutorService scheduledExecutorService; public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) { this.rateLimitFunction = rateLimitFunction; update(policies, clusterName); + this.scheduledExecutorService = null; } public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) { + this(publishRate, rateLimitFunction, null); + } + + public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction, + ScheduledExecutorService scheduledExecutorService) { this.rateLimitFunction = rateLimitFunction; update(publishRate); + this.scheduledExecutorService = scheduledExecutorService; } @Override public void checkPublishRate() { - // No-op + // No-op } @Override public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { - // No-op + // No-op } @Override @@ -62,10 +69,15 @@ public boolean resetPublishCount() { public boolean isPublishRateExceeded() { return false; } + // If all rate limiters are not exceeded, re-enable auto read from socket. private void tryReleaseConnectionThrottle() { - if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0) - || (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) { + RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage; + RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte; + if ((currentTopicPublishRateLimiterOnMessage != null + && currentTopicPublishRateLimiterOnMessage.getAvailablePermits() <= 0) + || (currentTopicPublishRateLimiterOnByte != null + && currentTopicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) { return; } this.rateLimitFunction.apply(); @@ -78,34 +90,73 @@ public void update(Policies policies, String clusterName) { : null; this.update(maxPublishRate); } + public void update(PublishRate maxPublishRate) { - if (maxPublishRate != null - && (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) { - this.publishThrottlingEnabled = true; - this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); - this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); - if (this.publishMaxMessageRate > 0) { - topicPublishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, - this::tryReleaseConnectionThrottle, true); + replaceLimiters(() -> { + if (maxPublishRate != null + && (maxPublishRate.publishThrottlingRateInMsg > 0 + || maxPublishRate.publishThrottlingRateInByte > 0)) { + this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); + this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); + if (this.publishMaxMessageRate > 0) { + topicPublishRateLimiterOnMessage = + RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(publishMaxMessageRate) + .rateLimitFunction(this::tryReleaseConnectionThrottle) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + } + if (this.publishMaxByteRate > 0) { + topicPublishRateLimiterOnByte = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(publishMaxByteRate) + .rateLimitFunction(this::tryReleaseConnectionThrottle) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + } + } else { + this.publishMaxMessageRate = 0; + this.publishMaxByteRate = 0; } - if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = - new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, - this::tryReleaseConnectionThrottle, true); - } - } else { - this.publishMaxMessageRate = 0; - this.publishMaxByteRate = 0; - this.publishThrottlingEnabled = false; - topicPublishRateLimiterOnMessage = null; - topicPublishRateLimiterOnByte = null; - } + }); } @Override public boolean tryAcquire(int numbers, long bytes) { - return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers)) - && (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes)); + RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage; + RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte; + return (currentTopicPublishRateLimiterOnMessage == null + || currentTopicPublishRateLimiterOnMessage.tryAcquire(numbers)) + && (currentTopicPublishRateLimiterOnByte == null + || currentTopicPublishRateLimiterOnByte.tryAcquire(bytes)); + } + + @Override + public void close() throws Exception { + rateLimitFunction.apply(); + replaceLimiters(null); + } + + private void replaceLimiters(Runnable updater) { + RateLimiter previousTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage; + topicPublishRateLimiterOnMessage = null; + RateLimiter previousTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte; + topicPublishRateLimiterOnByte = null; + try { + if (updater != null) { + updater.run(); + } + } finally { + // Close previous limiters to prevent resource leakages. + // Delay closing of previous limiters after new ones are in place so that updating the limiter + // doesn't cause unavailability. + if (previousTopicPublishRateLimiterOnMessage != null) { + previousTopicPublishRateLimiterOnMessage.close(); + } + if (previousTopicPublishRateLimiterOnByte != null) { + previousTopicPublishRateLimiterOnByte.close(); + } + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java index ec23b26a047e2..397887978b2b5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java @@ -21,7 +21,7 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; -public interface PublishRateLimiter { +public interface PublishRateLimiter extends AutoCloseable { PublishRateLimiter DISABLED_RATE_LIMITER = PublishRateLimiterDisable.DISABLED_RATE_LIMITER; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java index c72f6ba82b05d..cf18192e4ac6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -62,4 +62,8 @@ public boolean tryAcquire(int numbers, long bytes) { return true; } + @Override + public void close() throws Exception { + // No-op + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java index c1458cf487098..0e1200edc31cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -108,4 +108,9 @@ public void update(PublishRate maxPublishRate) { public boolean tryAcquire(int numbers, long bytes) { return false; } + + @Override + public void close() throws Exception { + // no-op + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 7abe710c0932c..5224eaa25851c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -445,6 +445,13 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); + if (topicPublishRateLimiter != null) { + try { + topicPublishRateLimiter.close(); + } catch (Exception e) { + log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); + } + } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); CompletableFuture clientCloseFuture = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 40ed2a131c45f..ca22511bff5b6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1157,6 +1157,13 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect futures.add(transactionBuffer.closeAsync()); replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); + if (topicPublishRateLimiter != null) { + try { + topicPublishRateLimiter.close(); + } catch (Exception e) { + log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); + } + } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); CompletableFuture clientCloseFuture = closeWithoutWaitingClientDisconnect diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index f014717cc7e14..a13328cc85933 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -140,7 +140,7 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif if (this.subscribeRateLimiter.get(consumerIdentifier) == null) { this.subscribeRateLimiter.put(consumerIdentifier, new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer, - ratePeriod, TimeUnit.SECONDS, null)); + ratePeriod, TimeUnit.SECONDS)); } else { this.subscribeRateLimiter.get(consumerIdentifier) .setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java new file mode 100644 index 0000000000000..61804e7255d8e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.testng.annotations.Test; + +public class PrecisPublishLimiterTest { + + @Test + void shouldResetMsgLimitAfterUpdate() { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> { + }); + precisPublishLimiter.update(new PublishRate(1, 1)); + assertFalse(precisPublishLimiter.tryAcquire(99, 99)); + precisPublishLimiter.update(new PublishRate(-1, 100)); + assertTrue(precisPublishLimiter.tryAcquire(99, 99)); + } + + @Test + void shouldResetBytesLimitAfterUpdate() { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> { + }); + precisPublishLimiter.update(new PublishRate(1, 1)); + assertFalse(precisPublishLimiter.tryAcquire(99, 99)); + precisPublishLimiter.update(new PublishRate(100, -1)); + assertTrue(precisPublishLimiter.tryAcquire(99, 99)); + } + + @Test + void shouldCloseResources() throws Exception { + for (int i = 0; i < 20000; i++) { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> { + }); + precisPublishLimiter.tryAcquire(99, 99); + precisPublishLimiter.close(); + } + } +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index cb88a95117b38..1bb2fcd9e08d6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -25,6 +25,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import lombok.Builder; /** * A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a @@ -48,7 +49,6 @@ * */ public class RateLimiter implements AutoCloseable{ - private final ScheduledExecutorService executorService; private long rateTime; private TimeUnit timeUnit; @@ -63,7 +63,7 @@ public class RateLimiter implements AutoCloseable{ private boolean isDispatchOrPrecisePublishRateLimiter; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { - this(null, permits, rateTime, timeUnit, null); + this(null, permits, rateTime, timeUnit); } public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) { @@ -82,6 +82,11 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU this.rateLimitFunction = autoReadResetFunction; } + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, + final TimeUnit timeUnit) { + this(service, permits, rateTime, timeUnit, (Supplier) null); + } + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier permitUpdater) { this(service, permits, rateTime, timeUnit, permitUpdater, false); @@ -89,6 +94,14 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { + this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter, + null); + } + + @Builder + RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime, + final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter, + RateLimitFunction rateLimitFunction) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); @@ -98,8 +111,8 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this.permitUpdater = permitUpdater; this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter; - if (service != null) { - this.executorService = service; + if (scheduledExecutorService != null) { + this.executorService = scheduledExecutorService; this.externalExecutor = true; } else { final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); @@ -109,6 +122,14 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this.externalExecutor = false; } + this.rateLimitFunction = rateLimitFunction; + + } + + // default values for Lombok generated builder class + public static class RateLimiterBuilder { + private long rateTime = 1; + private TimeUnit timeUnit = TimeUnit.SECONDS; } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index 08ea9ea1e52c4..b0083714ba52f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -68,13 +68,13 @@ public class FunctionStatsManager extends ComponentStatsManager{ final Counter statTotalSysExceptions; final Counter statTotalUserExceptions; - + final Summary statProcessLatency; final Gauge statlastInvocation; final Counter statTotalRecordsReceived; - + // windowed metrics final Counter statTotalProcessedSuccessfully1min; @@ -82,7 +82,7 @@ public class FunctionStatsManager extends ComponentStatsManager{ final Counter statTotalSysExceptions1min; final Counter statTotalUserExceptions1min; - + final Summary statProcessLatency1min; final Counter statTotalRecordsReceived1min; @@ -262,8 +262,8 @@ public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry, .help("Exception from sink.") .create()); - userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } public void addUserException(Throwable ex) { @@ -371,7 +371,7 @@ public double getTotalSysExceptions() { public double getTotalUserExceptions() { return _statTotalUserExceptions.get(); } - + @Override public double getLastInvocation() { return _statlastInvocation.get(); @@ -417,7 +417,7 @@ public double getTotalSysExceptions1min() { public double getTotalUserExceptions1min() { return _statTotalUserExceptions1min.get(); } - + @Override public double getAvgProcessLatency1min() { return _statProcessLatency1min.get().count <= 0.0 diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index 255ad62ce0c44..536f55a9a3919 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -196,8 +196,8 @@ public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] me .help("Exception from sink.") .create()); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index f4e1da061f723..451a8adc5e6f2 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -195,8 +195,8 @@ public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[] .help("Exception from source.") .create()); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } @Override