From 0fae02616026ed0753d4006d93597b62072b8626 Mon Sep 17 00:00:00 2001 From: Ron Farkash Date: Thu, 29 Jul 2021 18:22:27 +0300 Subject: [PATCH] [Issue #11351] Parallel Precise Publish Rate Limiting Fix (#11372) ## Master Issue: ### Motivation Hello, as far as I'm concerned it is well known that precise publish rate limiting does not function well. I believe my PR fixes problem number 3 stated in the issue above. @danielsinai: "3. Rate limit function passed only to the msg/s rate limiter (and that's in order to avoid calling it twice)" It was passed to message rate limiter only due to the fact that there was no implementation of a way to throttle the connection whenever only **one of the limiters was exceeded**. This PR will allow both message rate & byte rate to co-exist, limit and enable socket reading only when necessary. ### Modifications - _tryAcquire_ function in **PublishRateLimiterDisable** will return true. If publish rate was null, this function would get called and return false, thus throttling the client for no reason. If the publish rate is null, it means it was not set by anyone so there's no reason to throttle any connection. ```java public boolean tryAcquire(int numbers, long bytes) { return true; } ``` - **RateLimiter** _permits_ and _acquiredPermits_ were changed to volatile. ```java private volatile long permits; private volatile long acquiredPermits; ``` in order to allow reading access from multiple threads at the same time. also the removal of _synchronized_ keyword from _getAvailablePermits()_ function. ```java public long getAvailablePermits() { return Math.max(0, this.permits - this.acquiredPermits); } ``` **This is required, since a thread dead lock will happen if not.** - Created ~a HashMap to manage the byte and message rate limiters, and~ a function _releaseThrottle()_ to handle the auto read enable. If one of the rate limiters has no available permits we will not re-enable the auto read from the socket. --- .../broker/service/PrecisPublishLimiter.java | 16 +++++++++++++--- .../apache/pulsar/common/util/RateLimiter.java | 6 +++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index e981518886eac7..60fbcf029e820e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -62,7 +62,14 @@ public boolean resetPublishCount() { public boolean isPublishRateExceeded() { return false; } - + // If all rate limiters are not exceeded, re-enable auto read from socket. + private void tryReleaseConnectionThrottle() { + if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0) + || (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) { + return; + } + this.rateLimitFunction.apply(); + } @Override public void update(Policies policies, String clusterName) { @@ -79,10 +86,13 @@ public void update(PublishRate maxPublishRate) { this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); if (this.publishMaxMessageRate > 0) { topicPublishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true); + new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, + this::tryReleaseConnectionThrottle, true); } if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true); + topicPublishRateLimiterOnByte = + new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, + this::tryReleaseConnectionThrottle, true); } } else { this.publishMaxMessageRate = 0; diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index edaef1719ac941..cb88a95117b382 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -54,8 +54,8 @@ public class RateLimiter implements AutoCloseable{ private TimeUnit timeUnit; private final boolean externalExecutor; private ScheduledFuture renewTask; - private long permits; - private long acquiredPermits; + private volatile long permits; + private volatile long acquiredPermits; private boolean isClosed; // permitUpdate helps to update permit-rate at runtime private Supplier permitUpdater; @@ -217,7 +217,7 @@ public synchronized boolean tryAcquire(long acquirePermit) { * * @return returns 0 if permits is not available */ - public synchronized long getAvailablePermits() { + public long getAvailablePermits() { return Math.max(0, this.permits - this.acquiredPermits); }