diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index 4db6bf230865e..e981518886eac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -79,10 +79,10 @@ public void update(PublishRate maxPublishRate) { this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); if (this.publishMaxMessageRate > 0) { topicPublishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction); + new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true); } if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS); + topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true); } } else { this.publishMaxMessageRate = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java index 0ff3866a8a891..c72f6ba82b05d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -59,7 +59,7 @@ public void update(PublishRate maxPublishRate) { @Override public boolean tryAcquire(int numbers, long bytes) { // No-op - return false; + return true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index b820c606b9d7f..9131d5173433a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -20,10 +20,19 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.stats.Rate; +import org.apache.pulsar.common.util.RateLimitFunction; +import org.apache.pulsar.common.util.RateLimiter; +import org.apache.pulsar.utils.StatsOutputStream; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.HashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -38,13 +47,12 @@ public class PublishRateLimiterTest { private PrecisPublishLimiter precisPublishLimiter; private PublishRateLimiterImpl publishRateLimiter; - @BeforeMethod public void setup() throws Exception { policies.publishMaxMessageRate = new HashMap<>(); policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate); - precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, - () -> System.out.print("Refresh permit")); + + precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, () -> System.out.print("Refresh permit")); publishRateLimiter = new PublishRateLimiterImpl(policies, CLUSTER_NAME); } @@ -94,19 +102,62 @@ public void testPrecisePublishRateLimiterUpdate() { @Test public void testPrecisePublishRateLimiterAcquire() throws Exception { + Class precisPublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter"); + Field topicPublishRateLimiterOnMessageField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage"); + Field topicPublishRateLimiterOnByteField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte"); + topicPublishRateLimiterOnMessageField.setAccessible(true); + topicPublishRateLimiterOnByteField.setAccessible(true); + + RateLimiter topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter); + RateLimiter topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter); + + Method renewTopicPublishRateLimiterOnMessageMethod = topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null); + Method renewTopicPublishRateLimiterOnByteMethod = topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null); + renewTopicPublishRateLimiterOnMessageMethod.setAccessible(true); + renewTopicPublishRateLimiterOnByteMethod.setAccessible(true); + + // running tryAcquire in order to lazyInit the renewTask + precisPublishLimiter.tryAcquire(1, 10); + + Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask"); + Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask"); + onMessageRenewTaskField.setAccessible(true); + onByteRenewTaskField.setAccessible(true); + ScheduledFuture onMessageRenewTask = (ScheduledFuture) onMessageRenewTaskField.get(topicPublishRateLimiterOnMessage); + ScheduledFuture onByteRenewTask = (ScheduledFuture) onByteRenewTaskField.get(topicPublishRateLimiterOnByte); + + onMessageRenewTask.cancel(false); + onByteRenewTask.cancel(false); + + // renewing the permits from previous tests + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + // tryAcquire not exceeded assertTrue(precisPublishLimiter.tryAcquire(1, 10)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire numOfMessages exceeded assertFalse(precisPublishLimiter.tryAcquire(11, 100)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire msgSizeInBytes exceeded assertFalse(precisPublishLimiter.tryAcquire(10, 101)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + + // tryAcquire exceeded exactly + assertFalse(precisPublishLimiter.tryAcquire(10, 100)); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire not exceeded - assertTrue(precisPublishLimiter.tryAcquire(10, 100)); + assertTrue(precisPublishLimiter.tryAcquire(9, 99)); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index bd31853062774..edaef1719ac94 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -60,15 +60,25 @@ public class RateLimiter implements AutoCloseable{ // permitUpdate helps to update permit-rate at runtime private Supplier permitUpdater; private RateLimitFunction rateLimitFunction; - private boolean isDispatchRateLimiter; + private boolean isDispatchOrPrecisePublishRateLimiter; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { this(null, permits, rateTime, timeUnit, null); } + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) { + this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); + } + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, RateLimitFunction autoReadResetFunction) { - this(null, permits, rateTime, timeUnit, null); + this(null, permits, rateTime, timeUnit, null, false); + this.rateLimitFunction = autoReadResetFunction; + } + + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, + RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) { + this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); this.rateLimitFunction = autoReadResetFunction; } @@ -78,7 +88,7 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchRateLimiter) { + final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); @@ -86,7 +96,7 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this.timeUnit = timeUnit; this.permits = permits; this.permitUpdater = permitUpdater; - this.isDispatchRateLimiter = isDispatchRateLimiter; + this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter; if (service != null) { this.executorService = service; @@ -180,9 +190,13 @@ public synchronized boolean tryAcquire(long acquirePermit) { } boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; - if (isDispatchRateLimiter) { + if (isDispatchOrPrecisePublishRateLimiter) { // for dispatch rate limiter just add acquirePermit acquiredPermits += acquirePermit; + + // we want to back-pressure from the current state of the rateLimiter therefore we should check if there + // are any available premits again + canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; } else { // acquired-permits can't be larger than the rate if (acquirePermit > this.permits) { @@ -257,14 +271,15 @@ protected ScheduledFuture createTask() { } synchronized void renew() { - acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; + acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; if (permitUpdater != null) { long newPermitRate = permitUpdater.get(); if (newPermitRate > 0) { setRate(newPermitRate); } } - if (rateLimitFunction != null) { + // release the back-pressure by applying the rateLimitFunction only when there are available permits + if (rateLimitFunction != null && this.getAvailablePermits() > 0) { rateLimitFunction.apply(); } notifyAll();