Skip to content

Commit

Permalink
[Issue 8599] Fix DispatchRateLimiter does not take effect (#8611)
Browse files Browse the repository at this point in the history
Fixes #8599
Fixes #4777

Pulsar current support topic level and subscription level dispatch rate limiter by using `DispatchRateLimiter`.  When consumers connected to broker and start reading entry, broker judge whether rate limit is exceeded before reading, and increasing the permits after reading finished by call tryAcquire().  When there are multi consumers using one `DispatchRateLimiter`, these consumers could start reading together and may increasing the `acquiredPermits` far more than `permits` after reading finished. As `acquiredPermits` will reset to 0 every second, all consumers could start reading in the next second and dispatch rate limiter will take no effect in such case.

This PR change the behaviour of `DispatchRateLimiter`, minus `permits` every second instead of reset `acquiredPermits` to 0, and the reading will stop for a while until `acquiredPermits` return to a value less than  `permits` .

RateLimiterTest.testDispatchRate()

(cherry picked from commit 02fc06e)
  • Loading branch information
wangjialing218 authored and michaeljmarshall committed Dec 10, 2021
1 parent 6fee20f commit 0df22ad
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 10 deletions.
Expand Up @@ -334,7 +334,7 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
if (msgRate > 0) {
if (this.dispatchRateLimiterOnMessage == null) {
this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg);
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg, true);
} else {
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS, permitUpdaterMsg);
Expand All @@ -354,7 +354,7 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
if (byteRate > 0) {
if (this.dispatchRateLimiterOnByte == null) {
this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte);
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte, true);
} else {
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS, permitUpdaterByte);
Expand Down
Expand Up @@ -62,6 +62,7 @@ public class RateLimiter implements AutoCloseable{
// permitUpdate helps to update permit-rate at runtime
private Supplier<Long> permitUpdater;
private RateLimitFunction rateLimitFunction;
private boolean isDispatchRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit, null);
Expand All @@ -74,14 +75,20 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
this(service, permits, rateTime, timeUnit, permitUpdater, false);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchRateLimiter) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");

this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permits = permits;
this.permitUpdater = permitUpdater;
this.isDispatchRateLimiter = isDispatchRateLimiter;

if (service != null) {
this.executorService = service;
Expand Down Expand Up @@ -174,15 +181,22 @@ public synchronized boolean tryAcquire(long acquirePermit) {
renewTask = createTask();
}

// acquired-permits can't be larger than the rate
if (acquirePermit > this.permits) {
acquiredPermits = this.permits;
return false;
}
boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
if (canAcquire) {
if (isDispatchRateLimiter) {
// for dispatch rate limiter just add acquirePermit
acquiredPermits += acquirePermit;
} else {
// acquired-permits can't be larger than the rate
if (acquirePermit > this.permits) {
acquiredPermits = this.permits;
return false;
}

if (canAcquire) {
acquiredPermits += acquirePermit;
}
}

return canAcquire;
}

Expand Down Expand Up @@ -245,7 +259,7 @@ protected ScheduledFuture<?> createTask() {
}

synchronized void renew() {
acquiredPermits = 0;
acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0;
if (permitUpdater != null) {
long newPermitRate = permitUpdater.get();
if (newPermitRate > 0) {
Expand Down
Expand Up @@ -164,6 +164,27 @@ public void testResetRate() throws Exception {
rate.close();
}

@Test
public void testDispatchRate() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(null, permits, rateTimeMSec, TimeUnit.MILLISECONDS, null, true);
rate.tryAcquire(100);
rate.tryAcquire(100);
rate.tryAcquire(100);
assertEquals(rate.getAvailablePermits(), 0);

Thread.sleep(rateTimeMSec * 2);
// check after two rate-time: acquiredPermits is 100
assertEquals(rate.getAvailablePermits(), 0);

Thread.sleep(rateTimeMSec);
// check after three rate-time: acquiredPermits is 0
assertEquals(rate.getAvailablePermits() > 0, true);

rate.close();
}

@Test
public void testRateLimiterWithPermitUpdater() throws Exception {
long permits = 10;
Expand Down

0 comments on commit 0df22ad

Please sign in to comment.