From a9826bd35f323eef3eac598209ed27c0f1c7645a Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 26 Apr 2021 16:05:30 +0300 Subject: [PATCH] [Broker] Fix issue in set-publish-rate when using preciseTopicPublishRateLimiterEnable=true - disabling the limit didn't work after setting a limit because of a bug in PrecisPublishLimiter --- .../pulsar/broker/service/AbstractTopic.java | 2 +- .../broker/service/PrecisPublishLimiter.java | 87 ++++++++++++++----- .../nonpersistent/NonPersistentTopic.java | 7 ++ .../service/persistent/PersistentTopic.java | 7 ++ .../persistent/SubscribeRateLimiter.java | 2 +- .../service/PrecisPublishLimiterTest.java | 57 ++++++++++++ .../pulsar/common/util/RateLimiter.java | 14 ++- .../instance/stats/FunctionStatsManager.java | 14 +-- .../instance/stats/SinkStatsManager.java | 4 +- .../instance/stats/SourceStatsManager.java | 4 +- 10 files changed, 160 insertions(+), 38 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index cf07578c236556..4a2f1f42049c42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -910,7 +910,7 @@ protected void updatePublishDispatcher(PublishRate publishRate) { // create new rateLimiter if rate-limiter is disabled if (preciseTopicPublishRateLimitingEnable) { this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate, - () -> this.enableCnxAutoRead()); + () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor()); } else { this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index 4db6bf230865ef..597407bbe826c2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -18,39 +18,48 @@ */ package org.apache.pulsar.broker.service; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; import org.apache.pulsar.common.util.RateLimitFunction; import org.apache.pulsar.common.util.RateLimiter; -public class PrecisPublishLimiter implements PublishRateLimiter { +public class PrecisPublishLimiter implements PublishRateLimiter, AutoCloseable { protected volatile int publishMaxMessageRate = 0; protected volatile long publishMaxByteRate = 0; protected volatile boolean publishThrottlingEnabled = false; // precise mode for publish rate limiter - private RateLimiter topicPublishRateLimiterOnMessage; - private RateLimiter topicPublishRateLimiterOnByte; + private volatile RateLimiter topicPublishRateLimiterOnMessage; + private volatile RateLimiter topicPublishRateLimiterOnByte; private final RateLimitFunction rateLimitFunction; + private final ScheduledExecutorService scheduledExecutorService; public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) { this.rateLimitFunction = rateLimitFunction; update(policies, clusterName); + this.scheduledExecutorService = null; } public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) { + this(publishRate, rateLimitFunction, null); + } + + public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction, + ScheduledExecutorService scheduledExecutorService) { this.rateLimitFunction = rateLimitFunction; update(publishRate); + this.scheduledExecutorService = scheduledExecutorService; } @Override public void checkPublishRate() { - // No-op + // No-op } @Override public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { - // No-op + // No-op } @Override @@ -71,26 +80,31 @@ public void update(Policies policies, String clusterName) { : null; this.update(maxPublishRate); } + public void update(PublishRate maxPublishRate) { - if (maxPublishRate != null - && (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) { - this.publishThrottlingEnabled = true; - this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); - this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); - if (this.publishMaxMessageRate > 0) { - topicPublishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction); - } - if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS); + replaceLimiters(() -> { + if (maxPublishRate != null + && (maxPublishRate.publishThrottlingRateInMsg > 0 + || maxPublishRate.publishThrottlingRateInByte > 0)) { + this.publishThrottlingEnabled = true; + this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); + this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); + if (this.publishMaxMessageRate > 0) { + topicPublishRateLimiterOnMessage = + new RateLimiter(scheduledExecutorService, publishMaxMessageRate, 1, + TimeUnit.SECONDS, rateLimitFunction); + } + if (this.publishMaxByteRate > 0) { + // TODO: is it intentional that rateLimitFunction isn't passed to topicPublishRateLimiterOnByte? + topicPublishRateLimiterOnByte = new RateLimiter(scheduledExecutorService, publishMaxByteRate, + 1, TimeUnit.SECONDS); + } + } else { + this.publishMaxMessageRate = 0; + this.publishMaxByteRate = 0; + this.publishThrottlingEnabled = false; } - } else { - this.publishMaxMessageRate = 0; - this.publishMaxByteRate = 0; - this.publishThrottlingEnabled = false; - topicPublishRateLimiterOnMessage = null; - topicPublishRateLimiterOnByte = null; - } + }); } @Override @@ -98,4 +112,31 @@ public boolean tryAcquire(int numbers, long bytes) { return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers)) && (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes)); } + + @Override + public void close() throws Exception { + replaceLimiters(null); + } + + private void replaceLimiters(Runnable updater) { + RateLimiter previousTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage; + topicPublishRateLimiterOnMessage = null; + RateLimiter previousTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte; + topicPublishRateLimiterOnByte = null; + try { + if (updater != null) { + updater.run(); + } + } finally { + // Close previous limiters to prevent resource leakages. + // Delay closing of previous limiters after new ones are in place so that updating the limiter + // doesn't cause unavailability. + if (previousTopicPublishRateLimiterOnMessage != null) { + previousTopicPublishRateLimiterOnMessage.close(); + } + if (previousTopicPublishRateLimiterOnByte != null) { + previousTopicPublishRateLimiterOnByte.close(); + } + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 54a35b5d627bd5..a6a8961eb74510 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -425,6 +425,13 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); + if (topicPublishRateLimiter instanceof AutoCloseable) { + try { + ((AutoCloseable) topicPublishRateLimiter).close(); + } catch (Exception e) { + log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); + } + } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); CompletableFuture clientCloseFuture = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 972e1a8de49cca..336654ec9d2ee7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1129,6 +1129,13 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect futures.add(transactionBuffer.closeAsync()); replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); producers.values().forEach(producer -> futures.add(producer.disconnect())); + if (topicPublishRateLimiter instanceof AutoCloseable) { + try { + ((AutoCloseable) topicPublishRateLimiter).close(); + } catch (Exception e) { + log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e); + } + } subscriptions.forEach((s, sub) -> futures.add(sub.disconnect())); CompletableFuture clientCloseFuture = closeWithoutWaitingClientDisconnect diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java index ae26b458018613..0a85c511470a1c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/SubscribeRateLimiter.java @@ -140,7 +140,7 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif if (this.subscribeRateLimiter.get(consumerIdentifier) == null) { this.subscribeRateLimiter.put(consumerIdentifier, new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer, - ratePeriod, TimeUnit.SECONDS, null)); + ratePeriod, TimeUnit.SECONDS)); } else { this.subscribeRateLimiter.get(consumerIdentifier) .setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java new file mode 100644 index 00000000000000..a3f69925d19170 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PrecisPublishLimiterTest.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import org.apache.pulsar.common.policies.data.PublishRate; +import org.testng.annotations.Test; + +public class PrecisPublishLimiterTest { + + @Test + void shouldResetMsgLimitAfterUpdate() { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> { + }); + precisPublishLimiter.update(new PublishRate(1, 1)); + assertFalse(precisPublishLimiter.tryAcquire(100, 100)); + precisPublishLimiter.update(new PublishRate(-1, 100)); + assertTrue(precisPublishLimiter.tryAcquire(100, 100)); + } + + @Test + void shouldResetBytesLimitAfterUpdate() { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> { + }); + precisPublishLimiter.update(new PublishRate(1, 1)); + assertFalse(precisPublishLimiter.tryAcquire(100, 100)); + precisPublishLimiter.update(new PublishRate(100, -1)); + assertTrue(precisPublishLimiter.tryAcquire(100, 100)); + } + + @Test + void shouldCloseResources() throws Exception { + for (int i = 0; i < 20000; i++) { + PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> { + }); + precisPublishLimiter.tryAcquire(100, 100); + precisPublishLimiter.close(); + } + } +} \ No newline at end of file diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index 6a8afd2bc1036b..259215786a8e7c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -62,15 +62,25 @@ public class RateLimiter implements AutoCloseable{ private RateLimitFunction rateLimitFunction; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { - this(null, permits, rateTime, timeUnit, null); + this(null, permits, rateTime, timeUnit); } public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, RateLimitFunction autoReadResetFunction) { - this(null, permits, rateTime, timeUnit, null); + this(null, permits, rateTime, timeUnit, autoReadResetFunction); + } + + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, + final TimeUnit timeUnit, RateLimitFunction autoReadResetFunction) { + this(service, permits, rateTime, timeUnit); this.rateLimitFunction = autoReadResetFunction; } + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, + final TimeUnit timeUnit) { + this(service, permits, rateTime, timeUnit, (Supplier) null); + } + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier permitUpdater) { checkArgument(permits > 0, "rate must be > 0"); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index 08ea9ea1e52c42..b0083714ba52f2 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -68,13 +68,13 @@ public class FunctionStatsManager extends ComponentStatsManager{ final Counter statTotalSysExceptions; final Counter statTotalUserExceptions; - + final Summary statProcessLatency; final Gauge statlastInvocation; final Counter statTotalRecordsReceived; - + // windowed metrics final Counter statTotalProcessedSuccessfully1min; @@ -82,7 +82,7 @@ public class FunctionStatsManager extends ComponentStatsManager{ final Counter statTotalSysExceptions1min; final Counter statTotalUserExceptions1min; - + final Summary statProcessLatency1min; final Counter statTotalRecordsReceived1min; @@ -262,8 +262,8 @@ public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry, .help("Exception from sink.") .create()); - userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } public void addUserException(Throwable ex) { @@ -371,7 +371,7 @@ public double getTotalSysExceptions() { public double getTotalUserExceptions() { return _statTotalUserExceptions.get(); } - + @Override public double getLastInvocation() { return _statlastInvocation.get(); @@ -417,7 +417,7 @@ public double getTotalSysExceptions1min() { public double getTotalUserExceptions1min() { return _statTotalUserExceptions1min.get(); } - + @Override public double getAvgProcessLatency1min() { return _statProcessLatency1min.get().count <= 0.0 diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index 46999cd7010c5a..e89546a3d2b806 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -196,8 +196,8 @@ public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] me .help("Exception from sink.") .create()); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } @Override diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index e79e0b5bffca1b..e340b37a74bcc6 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -195,8 +195,8 @@ public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[] .help("Exception from source.") .create()); - sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); - sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null); + sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); + sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES); } @Override