From bdc62ecac2a96b5ff657b5e70c306df81e3fe93d Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 3 Aug 2021 19:14:26 +0300 Subject: [PATCH] [Broker] Fix set-publish-rate when using preciseTopicPublishRateLimiterEnable=true (#10384) When using `preciseTopicPublishRateLimiterEnable=true` (introduced by #7078) setting for rate limiting, there are various issues: - updating the limits doesn't set either boundary when changing the limits from a bounded limit to unbounded. - each topic will create a scheduler thread for each limiter instance - each topic will never release the scheduler thread when the topic gets unloaded / closed - updating the limits didn't close the scheduler thread related to the replaced limiter instance - Fix updating of the limits by cleaning up the previous limiter instances before creating new limiter instances - Use `brokerService.pulsar().getExecutor()` as the scheduler for the rate limiter instances - Add resource cleanup hooks for topic closing (unload) The existing code has a difference in passing the `rateLimitFunction`: https://github.com/apache/pulsar/blob/69a173a82c89893f54dbe5b6f422249f66ea5418/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java#L80-L86 It's passed to the `topicPublishRateLimiterOnMessage`, but not to `topicPublishRateLimiterOnByte` . It is unclear whether this is intentional. The `rateLimitFunction` is `() -> this.enableCnxAutoRead()` https://github.com/apache/pulsar/blob/69a173a82c89893f54dbe5b6f422249f66ea5418/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java#L913 (This also raises a question whether rate limiting works consistently when multiple topics share the same connection.) (cherry picked from commit ded806fd52f6e2f182fa02052cbd82c2a6755098) --- .../pulsar/broker/service/AbstractTopic.java | 3 +- .../broker/service/PrecisPublishLimiter.java | 114 +++++++++++++----- .../broker/service/PublishRateLimiter.java | 2 +- .../service/PublishRateLimiterDisable.java | 4 + .../service/PublishRateLimiterImpl.java | 5 + .../nonpersistent/NonPersistentTopic.java | 7 ++ .../service/persistent/PersistentTopic.java | 7 ++ .../persistent/SubscribeRateLimiter.java | 5 +- .../service/PrecisPublishLimiterTest.java | 57 +++++++++ .../pulsar/common/util/RateLimiter.java | 29 ++++- .../instance/stats/FunctionStatsManager.java | 14 +-- .../instance/stats/SinkStatsManager.java | 4 +- .../instance/stats/SourceStatsManager.java | 4 +- 13 files changed, 204 insertions(+), 51 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index d568ac206c52a..2cbd863b92525 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -594,7 +594,8 @@ protected void updatePublishDispatcher(PublishRate publishRate) { || this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) { // create new rateLimiter if rate-limiter is disabled if (preciseTopicPublishRateLimitingEnable) { - this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate, ()-> this.enableCnxAutoRead()); + this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate, + () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor()); } else { this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index 016cf03c4fb77..e61597e2d139f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -18,40 +18,46 @@ */ package org.apache.pulsar.broker.service; +import java.util.concurrent.ScheduledExecutorService; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.util.RateLimitFunction; import org.apache.pulsar.common.util.RateLimiter; -import java.util.concurrent.TimeUnit; - public class PrecisPublishLimiter implements PublishRateLimiter { protected volatile int publishMaxMessageRate = 0; protected volatile long publishMaxByteRate = 0; - protected volatile boolean publishThrottlingEnabled = false; // precise mode for publish rate limiter - private RateLimiter topicPublishRateLimiterOnMessage; - private RateLimiter topicPublishRateLimiterOnByte; + private volatile RateLimiter topicPublishRateLimiterOnMessage; + private volatile RateLimiter topicPublishRateLimiterOnByte; private final RateLimitFunction rateLimitFunction; + private final ScheduledExecutorService scheduledExecutorService; public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) { this.rateLimitFunction = rateLimitFunction; update(policies, clusterName); + this.scheduledExecutorService = null; } public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) { + this(publishRate, rateLimitFunction, null); + } + + public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction, + ScheduledExecutorService scheduledExecutorService) { this.rateLimitFunction = rateLimitFunction; update(publishRate); + this.scheduledExecutorService = scheduledExecutorService; } @Override public void checkPublishRate() { - // No-op + // No-op } @Override public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { - // No-op + // No-op } @Override @@ -63,10 +69,15 @@ public boolean resetPublishCount() { public boolean isPublishRateExceeded() { return false; } + // If all rate limiters are not exceeded, re-enable auto read from socket. private void tryReleaseConnectionThrottle() { - if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0) - || (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) { + RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage; + RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte; + if ((currentTopicPublishRateLimiterOnMessage != null + && currentTopicPublishRateLimiterOnMessage.getAvailablePermits() <= 0) + || (currentTopicPublishRateLimiterOnByte != null + && currentTopicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) { return; } this.rateLimitFunction.apply(); @@ -79,34 +90,73 @@ public void update(Policies policies, String clusterName) { : null; this.update(maxPublishRate); } + public void update(PublishRate maxPublishRate) { - if (maxPublishRate != null - && (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) { - this.publishThrottlingEnabled = true; - this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); - this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); - if (this.publishMaxMessageRate > 0) { - topicPublishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, - this::tryReleaseConnectionThrottle, true); - } - if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = - new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, - this::tryReleaseConnectionThrottle, true); + replaceLimiters(() -> { + if (maxPublishRate != null + && (maxPublishRate.publishThrottlingRateInMsg > 0 + || maxPublishRate.publishThrottlingRateInByte > 0)) { + this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); + this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); + if (this.publishMaxMessageRate > 0) { + topicPublishRateLimiterOnMessage = + RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(publishMaxMessageRate) + .rateLimitFunction(this::tryReleaseConnectionThrottle) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + } + if (this.publishMaxByteRate > 0) { + topicPublishRateLimiterOnByte = RateLimiter.builder() + .scheduledExecutorService(scheduledExecutorService) + .permits(publishMaxByteRate) + .rateLimitFunction(this::tryReleaseConnectionThrottle) + .isDispatchOrPrecisePublishRateLimiter(true) + .build(); + } + } else { + this.publishMaxMessageRate = 0; + this.publishMaxByteRate = 0; } - } else { - this.publishMaxMessageRate = 0; - this.publishMaxByteRate = 0; - this.publishThrottlingEnabled = false; - topicPublishRateLimiterOnMessage = null; - topicPublishRateLimiterOnByte = null; - } + }); } @Override public boolean tryAcquire(int numbers, long bytes) { - return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers)) && - (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes)); + RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage; + RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte; + return (currentTopicPublishRateLimiterOnMessage == null + || currentTopicPublishRateLimiterOnMessage.tryAcquire(numbers)) + && (currentTopicPublishRateLimiterOnByte == null + || currentTopicPublishRateLimiterOnByte.tryAcquire(bytes)); + } + + @Override + public void close() throws Exception { + rateLimitFunction.apply(); + replaceLimiters(null); + } + + private void replaceLimiters(Runnable updater) { + RateLimiter previousTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage; + topicPublishRateLimiterOnMessage = null; + RateLimiter previousTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte; + topicPublishRateLimiterOnByte = null; + try { + if (updater != null) { + updater.run(); + } + } finally { + // Close previous limiters to prevent resource leakages. + // Delay closing of previous limiters after new ones are in place so that updating the limiter + // doesn't cause unavailability. + if (previousTopicPublishRateLimiterOnMessage != null) { + previousTopicPublishRateLimiterOnMessage.close(); + } + if (previousTopicPublishRateLimiterOnByte != null) { + previousTopicPublishRateLimiterOnByte.close(); + } + } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java index b121cdab77b14..8b9de81b56188 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiter.java @@ -21,7 +21,7 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; -public interface PublishRateLimiter { +public interface PublishRateLimiter extends AutoCloseable { PublishRateLimiter DISABLED_RATE_LIMITER = PublishRateLimiterDisable.DISABLED_RATE_LIMITER; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java index c72f6ba82b05d..cf18192e4ac6b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -62,4 +62,8 @@ public boolean tryAcquire(int numbers, long bytes) { return true; } + @Override + public void close() throws Exception { + // No-op + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java index 846b075f22a0f..b7e17e91637d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java @@ -109,4 +109,9 @@ public void update(PublishRate maxPublishRate) { public boolean tryAcquire(int numbers, long bytes) { return false; } + + @Override + public void close() throws Exception { + // no-op + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 8d49d20553c74..86aa9fc5f953d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -457,6 +457,13 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); + if (topicPublishRateLimiter != null) { + try { + topicPublishRateLimiter.close(); + } catch (Exception e) { + log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); + } + } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); CompletableFuture clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index bdc3fda8127f7..c850aa8d2812a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1055,6 +1055,13 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); + if (topicPublishRateLimiter != null) { + try { + topicPublishRateLimiter.close(); + } catch (Exception e) { + log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); + } + } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); CompletableFuture clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index 30f13fc9b108e..32fe834e0883a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -134,8 +134,9 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif // update subscribe-rateLimiter if (ratePerConsumer > 0) { if (this.subscribeRateLimiter.get(consumerIdentifier) == null) { - this.subscribeRateLimiter.put(consumerIdentifier, new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer, - ratePeriod, TimeUnit.SECONDS, null)); + this.subscribeRateLimiter.put(consumerIdentifier, + new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer, + ratePeriod, TimeUnit.SECONDS)); } else { this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS, null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java new file mode 100644 index 0000000000000..61804e7255d8e --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.testng.annotations.Test; + +public class PrecisPublishLimiterTest { + + @Test + void shouldResetMsgLimitAfterUpdate() { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> { + }); + precisPublishLimiter.update(new PublishRate(1, 1)); + assertFalse(precisPublishLimiter.tryAcquire(99, 99)); + precisPublishLimiter.update(new PublishRate(-1, 100)); + assertTrue(precisPublishLimiter.tryAcquire(99, 99)); + } + + @Test + void shouldResetBytesLimitAfterUpdate() { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> { + }); + precisPublishLimiter.update(new PublishRate(1, 1)); + assertFalse(precisPublishLimiter.tryAcquire(99, 99)); + precisPublishLimiter.update(new PublishRate(100, -1)); + assertTrue(precisPublishLimiter.tryAcquire(99, 99)); + } + + @Test + void shouldCloseResources() throws Exception { + for (int i = 0; i < 20000; i++) { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> { + }); + precisPublishLimiter.tryAcquire(99, 99); + precisPublishLimiter.close(); + } + } +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index a83517897e0df..0c22edf2d5c6f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -27,6 +27,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import lombok.Builder; /** * A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a @@ -50,7 +51,6 @@ * */ public class RateLimiter implements AutoCloseable{ - private final ScheduledExecutorService executorService; private long rateTime; private TimeUnit timeUnit; @@ -65,7 +65,7 @@ public class RateLimiter implements AutoCloseable{ private boolean isDispatchOrPrecisePublishRateLimiter; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { - this(null, permits, rateTime, timeUnit, null); + this(null, permits, rateTime, timeUnit); } public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) { @@ -84,6 +84,11 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU this.rateLimitFunction = autoReadResetFunction; } + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, + final TimeUnit timeUnit) { + this(service, permits, rateTime, timeUnit, (Supplier) null); + } + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier permitUpdater) { this(service, permits, rateTime, timeUnit, permitUpdater, false); @@ -91,6 +96,14 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { + this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter, + null); + } + + @Builder + RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime, + final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter, + RateLimitFunction rateLimitFunction) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); @@ -100,8 +113,8 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this.permitUpdater = permitUpdater; this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter; - if (service != null) { - this.executorService = service; + if (scheduledExecutorService != null) { + this.executorService = scheduledExecutorService; this.externalExecutor = true; } else { final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); @@ -111,6 +124,14 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this.externalExecutor = false; } + this.rateLimitFunction = rateLimitFunction; + + } + + // default values for Lombok generated builder class + public static class RateLimiterBuilder { + private long rateTime = 1; + private TimeUnit timeUnit = TimeUnit.SECONDS; } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index f02b8505ebba4..7e6e379c0dda1 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -68,13 +68,13 @@ public class FunctionStatsManager extends ComponentStatsManager{ final Counter statTotalSysExceptions; final Counter statTotalUserExceptions; - + final Summary statProcessLatency; final Gauge statlastInvocation; final Counter statTotalRecordsReceived; - + // windowed metrics final Counter statTotalProcessedSuccessfully1min; @@ -82,7 +82,7 @@ public class FunctionStatsManager extends ComponentStatsManager{ final Counter statTotalSysExceptions1min; final Counter statTotalUserExceptions1min; - + final Summary statProcessLatency1min; final Counter statTotalRecordsReceived1min; @@ -232,8 +232,8 @@ public FunctionStatsManager(CollectorRegistry collectorRegistry, .help("Exception from sink.") .register(collectorRegistry); - userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } public void addUserException(Throwable ex) { @@ -341,7 +341,7 @@ public double getTotalSysExceptions() { public double getTotalUserExceptions() { return _statTotalUserExceptions.get(); } - + @Override public double getLastInvocation() { return _statlastInvocation.get(); @@ -387,7 +387,7 @@ public double getTotalSysExceptions1min() { public double getTotalUserExceptions1min() { return _statTotalUserExceptions1min.get(); } - + @Override public double getAvgProcessLatency1min() { return _statProcessLatency1min.get().count <= 0.0 diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index 401aa34be697c..5c28d43ac1d78 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -174,8 +174,8 @@ public SinkStatsManager(CollectorRegistry collectorRegistry, String[] metricsLab .help("Exception from sink.") .register(collectorRegistry); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index 9acee9536a5bf..0baaa2ebaed77 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -173,8 +173,8 @@ public SourceStatsManager(CollectorRegistry collectorRegistry, String[] metricsL .help("Exception from source.") .register(collectorRegistry); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } @Override