diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 6addacf97ddfa..a3333cd4d4f8b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -342,7 +342,7 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { if (msgRate > 0) { if (this.dispatchRateLimiterOnMessage == null) { this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate, - ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg); + ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg, true); } else { this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond, TimeUnit.SECONDS, permitUpdaterMsg); @@ -362,7 +362,7 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { if (byteRate > 0) { if (this.dispatchRateLimiterOnByte == null) { this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate, - ratePeriod, TimeUnit.SECONDS, permitUpdaterByte); + ratePeriod, TimeUnit.SECONDS, permitUpdaterByte, true); } else { this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond, TimeUnit.SECONDS, permitUpdaterByte); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index f6c563dcb9e76..8a7e067bea246 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -211,8 +211,8 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio * verify rate-limiting should throttle message-dispatching based on byte-rate * *
-     *  1. dispatch-byte-rate = 100 bytes/sec
-     *  2. send 30 msgs : each with 10 byte
+     *  1. dispatch-byte-rate = 1000 bytes/sec
+     *  2. send 30 msgs : each with 100 byte
      *  3. it should take up to 2 second to receive all messages
      * 
* @@ -227,7 +227,7 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll"); final String subName = "my-subscriber-name-" + subscription; - final int byteRate = 100; + final int byteRate = 1000; DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1); admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index 6a8afd2bc1036..bd31853062774 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -60,6 +60,7 @@ public class RateLimiter implements AutoCloseable{ // permitUpdate helps to update permit-rate at runtime private Supplier permitUpdater; private RateLimitFunction rateLimitFunction; + private boolean isDispatchRateLimiter; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { this(null, permits, rateTime, timeUnit, null); @@ -72,7 +73,12 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit, Supplier permitUpdater) { + final TimeUnit timeUnit, Supplier permitUpdater) { + this(service, permits, rateTime, timeUnit, permitUpdater, false); + } + + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, + final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchRateLimiter) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); @@ -80,6 +86,7 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this.timeUnit = timeUnit; this.permits = permits; this.permitUpdater = permitUpdater; + this.isDispatchRateLimiter = isDispatchRateLimiter; if (service != null) { this.executorService = service; @@ -172,15 +179,22 @@ public synchronized boolean tryAcquire(long acquirePermit) { renewTask = createTask(); } - // acquired-permits can't be larger than the rate - if (acquirePermit > this.permits) { - acquiredPermits = this.permits; - return false; - } boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; - if (canAcquire) { + if (isDispatchRateLimiter) { + // for dispatch rate limiter just add acquirePermit acquiredPermits += acquirePermit; + } else { + // acquired-permits can't be larger than the rate + if (acquirePermit > this.permits) { + acquiredPermits = this.permits; + return false; + } + + if (canAcquire) { + acquiredPermits += acquirePermit; + } } + return canAcquire; } @@ -243,7 +257,7 @@ protected ScheduledFuture createTask() { } synchronized void renew() { - acquiredPermits = 0; + acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; if (permitUpdater != null) { long newPermitRate = permitUpdater.get(); if (newPermitRate > 0) { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java index 891609dbee7cb..61336f4210b8b 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java @@ -164,6 +164,27 @@ public void testResetRate() throws Exception { rate.close(); } + @Test + public void testDispatchRate() throws Exception { + final long rateTimeMSec = 1000; + final int permits = 100; + RateLimiter rate = new RateLimiter(null, permits, rateTimeMSec, TimeUnit.MILLISECONDS, null, true); + rate.tryAcquire(100); + rate.tryAcquire(100); + rate.tryAcquire(100); + assertEquals(rate.getAvailablePermits(), 0); + + Thread.sleep(rateTimeMSec * 2); + // check after two rate-time: acquiredPermits is 100 + assertEquals(rate.getAvailablePermits(), 0); + + Thread.sleep(rateTimeMSec); + // check after three rate-time: acquiredPermits is 0 + assertEquals(rate.getAvailablePermits() > 0, true); + + rate.close(); + } + @Test public void testRateLimiterWithPermitUpdater() throws Exception { long permits = 10;