Skip to content

Commit

Permalink
[Issue apache#11351] Parallel Precise Publish Rate Limiting Fix (apac…
Browse files Browse the repository at this point in the history
…he#11372)

## Master Issue: <apache#11351>

### Motivation
Hello, as far as I'm concerned it is well known that precise publish rate limiting does not function well. I believe my PR fixes problem number 3 stated in the issue above.

@danielsinai:
"3. Rate limit function passed only to the msg/s rate limiter (and that's in order to avoid calling it twice)"

It was passed to message rate limiter only due to the fact that there was no implementation of a way to throttle the connection whenever only **one of the limiters was exceeded**.

This PR will allow both message rate & byte rate to co-exist, limit and enable socket reading only when necessary.

### Modifications

- _tryAcquire_ function in **PublishRateLimiterDisable** will return true. If publish rate was null, this function would get called and return false, thus throttling the client for no reason. If the publish rate is null, it means it was not set by anyone so there's no reason to throttle any connection. 
```java
public boolean tryAcquire(int numbers, long bytes) {
        return true;
}
```
- **RateLimiter** _permits_ and _acquiredPermits_ were changed to volatile.
```java
 private volatile long permits;
 private volatile long acquiredPermits;
```
in order to allow reading access from multiple threads at the same time.
also the removal of _synchronized_ keyword from _getAvailablePermits()_ function.
```java
public long getAvailablePermits() {
        return Math.max(0, this.permits - this.acquiredPermits);
}
```
**This is required, since a thread dead lock will happen if not.**

- Created ~a HashMap to manage the byte and message rate limiters, and~ a function _releaseThrottle()_ to handle the auto read enable.
If one of the rate limiters has no available permits we will not re-enable the auto read from the socket.
  • Loading branch information
ronfarkash authored and ciaocloud committed Oct 16, 2021
1 parent 899c07c commit 0fae026
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
Expand Up @@ -62,7 +62,14 @@ public boolean resetPublishCount() {
public boolean isPublishRateExceeded() {
return false;
}

// If all rate limiters are not exceeded, re-enable auto read from socket.
private void tryReleaseConnectionThrottle() {
if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
|| (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
return;
}
this.rateLimitFunction.apply();
}

@Override
public void update(Policies policies, String clusterName) {
Expand All @@ -79,10 +86,13 @@ public void update(PublishRate maxPublishRate) {
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true);
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS,
this::tryReleaseConnectionThrottle, true);
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true);
topicPublishRateLimiterOnByte =
new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS,
this::tryReleaseConnectionThrottle, true);
}
} else {
this.publishMaxMessageRate = 0;
Expand Down
Expand Up @@ -54,8 +54,8 @@ public class RateLimiter implements AutoCloseable{
private TimeUnit timeUnit;
private final boolean externalExecutor;
private ScheduledFuture<?> renewTask;
private long permits;
private long acquiredPermits;
private volatile long permits;
private volatile long acquiredPermits;
private boolean isClosed;
// permitUpdate helps to update permit-rate at runtime
private Supplier<Long> permitUpdater;
Expand Down Expand Up @@ -217,7 +217,7 @@ public synchronized boolean tryAcquire(long acquirePermit) {
*
* @return returns 0 if permits is not available
*/
public synchronized long getAvailablePermits() {
public long getAvailablePermits() {
return Math.max(0, this.permits - this.acquiredPermits);
}

Expand Down

0 comments on commit 0fae026

Please sign in to comment.