From 0e3cb55bdc657c90ff3ca3a2ac36a4038ec95790 Mon Sep 17 00:00:00 2001 From: wangjialing Date: Tue, 24 Nov 2020 18:48:50 +0800 Subject: [PATCH 1/3] Increase byte size per sent in throttling test --- .../api/SubscriptionMessageDispatchThrottlingTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index f6c563dcb9e76..8a7e067bea246 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -211,8 +211,8 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio * verify rate-limiting should throttle message-dispatching based on byte-rate * *
-     *  1. dispatch-byte-rate = 100 bytes/sec
-     *  2. send 30 msgs : each with 10 byte
+     *  1. dispatch-byte-rate = 1000 bytes/sec
+     *  2. send 30 msgs : each with 100 byte
      *  3. it should take up to 2 second to receive all messages
      * 
* @@ -227,7 +227,7 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll"); final String subName = "my-subscriber-name-" + subscription; - final int byteRate = 100; + final int byteRate = 1000; DispatchRate dispatchRate = new DispatchRate(-1, byteRate, 1); admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate); From cf5cd0c68fa4ebf066b7b4383b42d356d67ee819 Mon Sep 17 00:00:00 2001 From: wangjialing Date: Tue, 24 Nov 2020 22:45:52 +0800 Subject: [PATCH 2/3] Fix DispatchRateLimiter does not take effect --- .../persistent/DispatchRateLimiter.java | 4 +-- .../pulsar/common/util/RateLimiter.java | 30 ++++++++++++++----- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 6addacf97ddfa..a3333cd4d4f8b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -342,7 +342,7 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { if (msgRate > 0) { if (this.dispatchRateLimiterOnMessage == null) { this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate, - ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg); + ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg, true); } else { this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond, TimeUnit.SECONDS, permitUpdaterMsg); @@ -362,7 +362,7 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) { if (byteRate > 0) { if (this.dispatchRateLimiterOnByte == null) { this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate, - ratePeriod, TimeUnit.SECONDS, permitUpdaterByte); + ratePeriod, TimeUnit.SECONDS, permitUpdaterByte, true); } else { this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond, TimeUnit.SECONDS, permitUpdaterByte); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index 6a8afd2bc1036..8d0309d3401b4 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -60,6 +60,7 @@ public class RateLimiter implements AutoCloseable{ // permitUpdate helps to update permit-rate at runtime private Supplier permitUpdater; private RateLimitFunction rateLimitFunction; + private boolean isDispatchRateLimiter; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { this(null, permits, rateTime, timeUnit, null); @@ -72,7 +73,12 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit, Supplier permitUpdater) { + final TimeUnit timeUnit, Supplier permitUpdater) { + this(service, permits, rateTime, timeUnit, null, false); + } + + public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, + final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchRateLimiter) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); @@ -80,6 +86,7 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this.timeUnit = timeUnit; this.permits = permits; this.permitUpdater = permitUpdater; + this.isDispatchRateLimiter = isDispatchRateLimiter; if (service != null) { this.executorService = service; @@ -172,15 +179,22 @@ public synchronized boolean tryAcquire(long acquirePermit) { renewTask = createTask(); } - // acquired-permits can't be larger than the rate - if (acquirePermit > this.permits) { - acquiredPermits = this.permits; - return false; - } boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; - if (canAcquire) { + if (isDispatchRateLimiter) { + // for dispatch rate limiter just add acquirePermit acquiredPermits += acquirePermit; + } else { + // acquired-permits can't be larger than the rate + if (acquirePermit > this.permits) { + acquiredPermits = this.permits; + return false; + } + + if (canAcquire) { + acquiredPermits += acquirePermit; + } } + return canAcquire; } @@ -243,7 +257,7 @@ protected ScheduledFuture createTask() { } synchronized void renew() { - acquiredPermits = 0; + acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; if (permitUpdater != null) { long newPermitRate = permitUpdater.get(); if (newPermitRate > 0) { From d10c45b30ef4651e051819f0bd83a3f51249f2b2 Mon Sep 17 00:00:00 2001 From: wangjialing Date: Tue, 24 Nov 2020 22:57:38 +0800 Subject: [PATCH 3/3] Add test for DispatchRateLimiter --- .../pulsar/common/util/RateLimiter.java | 2 +- .../pulsar/common/util/RateLimiterTest.java | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index 8d0309d3401b4..bd31853062774 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -74,7 +74,7 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, final TimeUnit timeUnit, Supplier permitUpdater) { - this(service, permits, rateTime, timeUnit, null, false); + this(service, permits, rateTime, timeUnit, permitUpdater, false); } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java index 891609dbee7cb..61336f4210b8b 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/RateLimiterTest.java @@ -164,6 +164,27 @@ public void testResetRate() throws Exception { rate.close(); } + @Test + public void testDispatchRate() throws Exception { + final long rateTimeMSec = 1000; + final int permits = 100; + RateLimiter rate = new RateLimiter(null, permits, rateTimeMSec, TimeUnit.MILLISECONDS, null, true); + rate.tryAcquire(100); + rate.tryAcquire(100); + rate.tryAcquire(100); + assertEquals(rate.getAvailablePermits(), 0); + + Thread.sleep(rateTimeMSec * 2); + // check after two rate-time: acquiredPermits is 100 + assertEquals(rate.getAvailablePermits(), 0); + + Thread.sleep(rateTimeMSec); + // check after three rate-time: acquiredPermits is 0 + assertEquals(rate.getAvailablePermits() > 0, true); + + rate.close(); + } + @Test public void testRateLimiterWithPermitUpdater() throws Exception { long permits = 10;