Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue 8599] Fix DispatchRateLimiter does not take effect #8611

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -342,7 +342,7 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
if (msgRate > 0) {
if (this.dispatchRateLimiterOnMessage == null) {
this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg);
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg, true);
} else {
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS, permitUpdaterMsg);
Expand All @@ -362,7 +362,7 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
if (byteRate > 0) {
if (this.dispatchRateLimiterOnByte == null) {
this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte);
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte, true);
} else {
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS, permitUpdaterByte);
Expand Down
Expand Up @@ -211,8 +211,8 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio
* verify rate-limiting should throttle message-dispatching based on byte-rate
*
* <pre>
* 1. dispatch-byte-rate = 100 bytes/sec
* 2. send 30 msgs : each with 10 byte
* 1. dispatch-byte-rate = 1000 bytes/sec
* 2. send 30 msgs : each with 100 byte
* 3. it should take up to 2 second to receive all messages
* </pre>
*
Expand All @@ -227,7 +227,7 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT
final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
final String subName = "my-subscriber-name-" + subscription;

final int byteRate = 100;
final int byteRate = 1000;
DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1);
admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate);
Expand Down
Expand Up @@ -60,6 +60,7 @@ public class RateLimiter implements AutoCloseable{
// permitUpdate helps to update permit-rate at runtime
private Supplier<Long> permitUpdater;
private RateLimitFunction rateLimitFunction;
private boolean isDispatchRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit, null);
Expand All @@ -72,14 +73,20 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
this(service, permits, rateTime, timeUnit, permitUpdater, false);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchRateLimiter) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");

this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permits = permits;
this.permitUpdater = permitUpdater;
this.isDispatchRateLimiter = isDispatchRateLimiter;

if (service != null) {
this.executorService = service;
Expand Down Expand Up @@ -172,15 +179,22 @@ public synchronized boolean tryAcquire(long acquirePermit) {
renewTask = createTask();
}

// acquired-permits can't be larger than the rate
if (acquirePermit > this.permits) {
acquiredPermits = this.permits;
return false;
}
boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
if (canAcquire) {
if (isDispatchRateLimiter) {
// for dispatch rate limiter just add acquirePermit
acquiredPermits += acquirePermit;
} else {
// acquired-permits can't be larger than the rate
if (acquirePermit > this.permits) {
acquiredPermits = this.permits;
return false;
}

if (canAcquire) {
acquiredPermits += acquirePermit;
}
}

return canAcquire;
}

Expand Down Expand Up @@ -243,7 +257,7 @@ protected ScheduledFuture<?> createTask() {
}

synchronized void renew() {
acquiredPermits = 0;
acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0;
if (permitUpdater != null) {
long newPermitRate = permitUpdater.get();
if (newPermitRate > 0) {
Expand Down
Expand Up @@ -164,6 +164,27 @@ public void testResetRate() throws Exception {
rate.close();
}

@Test
public void testDispatchRate() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(null, permits, rateTimeMSec, TimeUnit.MILLISECONDS, null, true);
rate.tryAcquire(100);
rate.tryAcquire(100);
rate.tryAcquire(100);
assertEquals(rate.getAvailablePermits(), 0);

Thread.sleep(rateTimeMSec * 2);
// check after two rate-time: acquiredPermits is 100
assertEquals(rate.getAvailablePermits(), 0);

Thread.sleep(rateTimeMSec);
// check after three rate-time: acquiredPermits is 0
assertEquals(rate.getAvailablePermits() > 0, true);

rate.close();
}

@Test
public void testRateLimiterWithPermitUpdater() throws Exception {
long permits = 10;
Expand Down