Skip to content

Commit

Permalink
[issue #13351] Solving precise rate limiting does not takes effect (#…
Browse files Browse the repository at this point in the history
…11446)

![image](https://user-images.githubusercontent.com/51213812/126812923-91bb827c-246d-451d-8f25-343bb2c1dca0.png)

befoe this PR precise publish rate limiting wasn't taking effect at all

In order to solve the current problems, there are 2 modifications

1. Using IsDispatchRateLimiting in precise publish rate limiter as well (in order to starve the producer)
2. Checking if there are available permits before resetting the read from the connection again

Already covered by current tests.

(cherry picked from commit 00ad07d)
  • Loading branch information
danielsinai authored and michaeljmarshall committed Dec 10, 2021
1 parent 0df22ad commit bc887d8
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 17 deletions.
Expand Up @@ -79,10 +79,11 @@ public void update(PublishRate maxPublishRate) {
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage = new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
topicPublishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true);
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true);
}
} else {
this.publishMaxMessageRate = 0;
Expand Down
Expand Up @@ -59,7 +59,7 @@ public void update(PublishRate maxPublishRate) {
@Override
public boolean tryAcquire(int numbers, long bytes) {
// No-op
return false;
return true;
}

}
Expand Up @@ -20,10 +20,19 @@

import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.RateLimitFunction;
import org.apache.pulsar.common.util.RateLimiter;
import org.apache.pulsar.utils.StatsOutputStream;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
Expand All @@ -38,13 +47,12 @@ public class PublishRateLimiterTest {
private PrecisPublishLimiter precisPublishLimiter;
private PublishRateLimiterImpl publishRateLimiter;


@BeforeMethod
public void setup() throws Exception {
policies.publishMaxMessageRate = new HashMap<>();
policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate);
precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME,
() -> System.out.print("Refresh permit"));

precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, () -> System.out.print("Refresh permit"));
publishRateLimiter = new PublishRateLimiterImpl(policies, CLUSTER_NAME);
}

Expand Down Expand Up @@ -94,19 +102,62 @@ public void testPrecisePublishRateLimiterUpdate() throws Exception {

@Test
public void testPrecisePublishRateLimiterAcquire() throws Exception {
Class precisPublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter");
Field topicPublishRateLimiterOnMessageField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage");
Field topicPublishRateLimiterOnByteField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte");
topicPublishRateLimiterOnMessageField.setAccessible(true);
topicPublishRateLimiterOnByteField.setAccessible(true);

RateLimiter topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter);
RateLimiter topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter);

Method renewTopicPublishRateLimiterOnMessageMethod = topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null);
Method renewTopicPublishRateLimiterOnByteMethod = topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null);
renewTopicPublishRateLimiterOnMessageMethod.setAccessible(true);
renewTopicPublishRateLimiterOnByteMethod.setAccessible(true);

// running tryAcquire in order to lazyInit the renewTask
precisPublishLimiter.tryAcquire(1, 10);

Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask");
Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask");
onMessageRenewTaskField.setAccessible(true);
onByteRenewTaskField.setAccessible(true);
ScheduledFuture<?> onMessageRenewTask = (ScheduledFuture<?>) onMessageRenewTaskField.get(topicPublishRateLimiterOnMessage);
ScheduledFuture<?> onByteRenewTask = (ScheduledFuture<?>) onByteRenewTaskField.get(topicPublishRateLimiterOnByte);

onMessageRenewTask.cancel(false);
onByteRenewTask.cancel(false);

// renewing the permits from previous tests
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);

// tryAcquire not exceeded
assertTrue(precisPublishLimiter.tryAcquire(1, 10));
Thread.sleep(1100);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);

// tryAcquire numOfMessages exceeded
assertFalse(precisPublishLimiter.tryAcquire(11, 100));
Thread.sleep(1100);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);

// tryAcquire msgSizeInBytes exceeded
assertFalse(precisPublishLimiter.tryAcquire(10, 101));
Thread.sleep(1100);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);

// tryAcquire exceeded exactly
assertFalse(precisPublishLimiter.tryAcquire(10, 100));
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);
renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage);
renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);

// tryAcquire not exceeded
assertTrue(precisPublishLimiter.tryAcquire(10, 100));
assertTrue(precisPublishLimiter.tryAcquire(9, 99));
}
}
Expand Up @@ -62,15 +62,25 @@ public class RateLimiter implements AutoCloseable{
// permitUpdate helps to update permit-rate at runtime
private Supplier<Long> permitUpdater;
private RateLimitFunction rateLimitFunction;
private boolean isDispatchRateLimiter;
private boolean isDispatchOrPrecisePublishRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit, null);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction) {
this(null, permits, rateTime, timeUnit, null);
this(null, permits, rateTime, timeUnit, null, false);
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
this.rateLimitFunction = autoReadResetFunction;
}

Expand All @@ -80,15 +90,15 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchRateLimiter) {
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");

this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permits = permits;
this.permitUpdater = permitUpdater;
this.isDispatchRateLimiter = isDispatchRateLimiter;
this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter;

if (service != null) {
this.executorService = service;
Expand Down Expand Up @@ -182,9 +192,13 @@ public synchronized boolean tryAcquire(long acquirePermit) {
}

boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
if (isDispatchRateLimiter) {
if (isDispatchOrPrecisePublishRateLimiter) {
// for dispatch rate limiter just add acquirePermit
acquiredPermits += acquirePermit;

// we want to back-pressure from the current state of the rateLimiter therefore we should check if there
// are any available premits again
canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
} else {
// acquired-permits can't be larger than the rate
if (acquirePermit > this.permits) {
Expand Down Expand Up @@ -259,14 +273,15 @@ protected ScheduledFuture<?> createTask() {
}

synchronized void renew() {
acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0;
acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0, acquiredPermits - permits) : 0;
if (permitUpdater != null) {
long newPermitRate = permitUpdater.get();
if (newPermitRate > 0) {
setRate(newPermitRate);
}
}
if (rateLimitFunction != null) {
// release the back-pressure by applying the rateLimitFunction only when there are available permits
if (rateLimitFunction != null && this.getAvailablePermits() > 0) {
rateLimitFunction.apply();
}
notifyAll();
Expand Down

0 comments on commit bc887d8

Please sign in to comment.