Skip to content

Commit

Permalink
[issue #13351] Solving precise rate limiting does not takes effect (#…
Browse files Browse the repository at this point in the history
…11446)

![image](https://user-images.githubusercontent.com/51213812/126812923-91bb827c-246d-451d-8f25-343bb2c1dca0.png)

befoe this PR precise publish rate limiting wasn't taking effect at all
### Modifications

In order to solve the current problems, there are 2 modifications

1. Using IsDispatchRateLimiting in precise publish rate limiter as well (in order to starve the producer)
2. Checking if there are available permits before resetting the read from the connection again

### Verifying this change

Already covered by current tests.

(cherry picked from commit 00ad07d)
  • Loading branch information
danielsinai authored and codelipenghui committed Jul 30, 2021
1 parent 38d3389 commit 06c6adf
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 17 deletions.
Expand Up @@ -79,10 +79,10 @@ public void update(PublishRate maxPublishRate) {
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true);
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true);
}
} else {
this.publishMaxMessageRate = 0;
Expand Down
Expand Up @@ -59,7 +59,7 @@ public void update(PublishRate maxPublishRate) {
@Override
public boolean tryAcquire(int numbers, long bytes) {
// No-op
return false;
return true;
}

}
Expand Up @@ -20,10 +20,19 @@

import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.RateLimitFunction;
import org.apache.pulsar.common.util.RateLimiter;
import org.apache.pulsar.utils.StatsOutputStream;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
Expand All @@ -38,13 +47,12 @@ public class PublishRateLimiterTest {
private PrecisPublishLimiter precisPublishLimiter;
private PublishRateLimiterImpl publishRateLimiter;


@BeforeMethod
public void setup() throws Exception {
policies.publishMaxMessageRate = new HashMap<>();
policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate);
precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME,
() -> System.out.print("Refresh permit"));

precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, () -> System.out.print("Refresh permit"));
publishRateLimiter = new PublishRateLimiterImpl(policies, CLUSTER_NAME);
}

Expand Down Expand Up @@ -94,19 +102,62 @@ public void testPrecisePublishRateLimiterUpdate() {

@Test
public void testPrecisePublishRateLimiterAcquire() throws Exception {
Class precisPublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter");
Field topicPublishRateLimiterOnMessageField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage");
Field topicPublishRateLimiterOnByteField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte");
topicPublishRateLimiterOnMessageField.setAccessible(true);
topicPublishRateLimiterOnByteField.setAccessible(true);

RateLimiter topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter);
RateLimiter topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter);

Method renewTopicPublishRateLimiterOnMessageMethod = topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null);
Method renewTopicPublishRateLimiterOnByteMethod = topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null);
renewTopicPublishRateLimiterOnMessageMethod.setAccessible(true);
renewTopicPublishRateLimiterOnByteMethod.setAccessible(true);

// running tryAcquire in order to lazyInit the renewTask
precisPublishLimiter.tryAcquire(1, 10);

Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask");
Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask");
onMessageRenewTaskField.setAccessible(true);
onByteRenewTaskField.setAccessible(true);
ScheduledFuture<?> onMessageRenewTask = (ScheduledFuture<?>) onMessageRenewTaskField.get(topicPublishRateLimiterOnMessage);
ScheduledFuture<?> onByteRenewTask = (ScheduledFuture<?>) onByteRenewTaskField.get(topicPublishRateLimiterOnByte);

onMessageRenewTask.cancel(false);
onByteRenewTask.cancel(false);

// renewing the permits from previous tests
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);

// tryAcquire not exceeded
assertTrue(precisPublishLimiter.tryAcquire(1, 10));
Thread.sleep(1100);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);

// tryAcquire numOfMessages exceeded
assertFalse(precisPublishLimiter.tryAcquire(11, 100));
Thread.sleep(1100);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);

// tryAcquire msgSizeInBytes exceeded
assertFalse(precisPublishLimiter.tryAcquire(10, 101));
Thread.sleep(1100);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);

// tryAcquire exceeded exactly
assertFalse(precisPublishLimiter.tryAcquire(10, 100));
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);

// tryAcquire not exceeded
assertTrue(precisPublishLimiter.tryAcquire(10, 100));
assertTrue(precisPublishLimiter.tryAcquire(9, 99));
}
}
Expand Up @@ -60,15 +60,25 @@ public class RateLimiter implements AutoCloseable{
// permitUpdate helps to update permit-rate at runtime
private Supplier<Long> permitUpdater;
private RateLimitFunction rateLimitFunction;
private boolean isDispatchRateLimiter;
private boolean isDispatchOrPrecisePublishRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit, null);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction) {
this(null, permits, rateTime, timeUnit, null);
this(null, permits, rateTime, timeUnit, null, false);
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
this.rateLimitFunction = autoReadResetFunction;
}

Expand All @@ -78,15 +88,15 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchRateLimiter) {
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");

this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permits = permits;
this.permitUpdater = permitUpdater;
this.isDispatchRateLimiter = isDispatchRateLimiter;
this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter;

if (service != null) {
this.executorService = service;
Expand Down Expand Up @@ -180,9 +190,13 @@ public synchronized boolean tryAcquire(long acquirePermit) {
}

boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
if (isDispatchRateLimiter) {
if (isDispatchOrPrecisePublishRateLimiter) {
// for dispatch rate limiter just add acquirePermit
acquiredPermits += acquirePermit;

// we want to back-pressure from the current state of the rateLimiter therefore we should check if there
// are any available premits again
canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
} else {
// acquired-permits can't be larger than the rate
if (acquirePermit > this.permits) {
Expand Down Expand Up @@ -257,14 +271,15 @@ protected ScheduledFuture<?> createTask() {
}

synchronized void renew() {
acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0;
acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0, acquiredPermits - permits) : 0;
if (permitUpdater != null) {
long newPermitRate = permitUpdater.get();
if (newPermitRate > 0) {
setRate(newPermitRate);
}
}
if (rateLimitFunction != null) {
// release the back-pressure by applying the rateLimitFunction only when there are available permits
if (rateLimitFunction != null && this.getAvailablePermits() > 0) {
rateLimitFunction.apply();
}
notifyAll();
Expand Down

0 comments on commit 06c6adf

Please sign in to comment.