From 6a89b0509e4266dc12f9b06bbb742e951ee22d89 Mon Sep 17 00:00:00 2001 From: danielsinai Date: Fri, 23 Jul 2021 12:27:12 -0400 Subject: [PATCH 1/7] Added isDispatchRateLimiter to PrecisePublishRateLimiter --- .../broker/service/PrecisPublishLimiter.java | 4 ++-- .../service/PublishRateLimiterDisable.java | 2 +- .../pulsar/common/util/RateLimiter.java | 24 +++++++++++++------ 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index 4db6bf230865e..e981518886eac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -79,10 +79,10 @@ public void update(PublishRate maxPublishRate) { this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); if (this.publishMaxMessageRate > 0) { topicPublishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction); + new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true); } if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS); + topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true); } } else { this.publishMaxMessageRate = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java index 0ff3866a8a891..c72f6ba82b05d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -59,7 +59,7 @@ public void update(PublishRate maxPublishRate) { @Override public boolean tryAcquire(int numbers, long bytes) { // No-op - return false; + return true; } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index bd31853062774..b429f85508b05 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -60,15 +60,25 @@ public class RateLimiter implements AutoCloseable{ // permitUpdate helps to update permit-rate at runtime private Supplier permitUpdater; private RateLimitFunction rateLimitFunction; - private boolean isDispatchRateLimiter; + private boolean isDispatchOrPrecisePublishRateLimiter; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { this(null, permits, rateTime, timeUnit, null); } + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) { + this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); + } + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, RateLimitFunction autoReadResetFunction) { - this(null, permits, rateTime, timeUnit, null); + this(null, permits, rateTime, timeUnit, null, false); + this.rateLimitFunction = autoReadResetFunction; + } + + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, + RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) { + this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); this.rateLimitFunction = autoReadResetFunction; } @@ -78,7 +88,7 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchRateLimiter) { + final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); @@ -86,7 +96,7 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this.timeUnit = timeUnit; this.permits = permits; this.permitUpdater = permitUpdater; - this.isDispatchRateLimiter = isDispatchRateLimiter; + this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter; if (service != null) { this.executorService = service; @@ -180,7 +190,7 @@ public synchronized boolean tryAcquire(long acquirePermit) { } boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; - if (isDispatchRateLimiter) { + if (isDispatchOrPrecisePublishRateLimiter) { // for dispatch rate limiter just add acquirePermit acquiredPermits += acquirePermit; } else { @@ -257,14 +267,14 @@ protected ScheduledFuture createTask() { } synchronized void renew() { - acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; + acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; if (permitUpdater != null) { long newPermitRate = permitUpdater.get(); if (newPermitRate > 0) { setRate(newPermitRate); } } - if (rateLimitFunction != null) { + if (rateLimitFunction != null && this.getAvailablePermits() > 0) { rateLimitFunction.apply(); } notifyAll(); From 46b0ab86b751ead0539f8af06ba3a4108419c242 Mon Sep 17 00:00:00 2001 From: danielsinai Date: Sat, 24 Jul 2021 17:28:18 -0400 Subject: [PATCH 2/7] added comments for renew rateLimitFunction apply logic --- .../src/main/java/org/apache/pulsar/common/util/RateLimiter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index b429f85508b05..aec4bba7d1418 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -274,6 +274,7 @@ synchronized void renew() { setRate(newPermitRate); } } + // release the back-pressure by applying the rateLimitFunction only when there are available permits if (rateLimitFunction != null && this.getAvailablePermits() > 0) { rateLimitFunction.apply(); } From 0bfafe4231965d5f37caff612cde0fb2c7f98ce6 Mon Sep 17 00:00:00 2001 From: danielsinai Date: Sun, 25 Jul 2021 13:04:17 -0400 Subject: [PATCH 3/7] added throttling at the when permits exceeds in the first time --- .../apache/pulsar/broker/service/PublishRateLimiterTest.java | 5 ++++- .../main/java/org/apache/pulsar/common/util/RateLimiter.java | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index b820c606b9d7f..f6ed44aae0dbd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -106,7 +106,10 @@ public void testPrecisePublishRateLimiterAcquire() throws Exception { assertFalse(precisPublishLimiter.tryAcquire(10, 101)); Thread.sleep(1100); + // tryAcquire exceeded exactly + assertFalse(precisPublishLimiter.tryAcquire(10, 100)); + // tryAcquire not exceeded - assertTrue(precisPublishLimiter.tryAcquire(10, 100)); + assertFalse(precisPublishLimiter.tryAcquire(9, 99)); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index aec4bba7d1418..edaef1719ac94 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -193,6 +193,10 @@ public synchronized boolean tryAcquire(long acquirePermit) { if (isDispatchOrPrecisePublishRateLimiter) { // for dispatch rate limiter just add acquirePermit acquiredPermits += acquirePermit; + + // we want to back-pressure from the current state of the rateLimiter therefore we should check if there + // are any available premits again + canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; } else { // acquired-permits can't be larger than the rate if (acquirePermit > this.permits) { From ce4482e598bec7620609f2b4a3ccc736d13b94d0 Mon Sep 17 00:00:00 2001 From: danielsinai Date: Sun, 25 Jul 2021 13:09:53 -0400 Subject: [PATCH 4/7] changed the time duration to reduce tests dependency --- .../apache/pulsar/broker/service/PublishRateLimiterTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index f6ed44aae0dbd..af0602cdfa012 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -104,12 +104,13 @@ public void testPrecisePublishRateLimiterAcquire() throws Exception { // tryAcquire msgSizeInBytes exceeded assertFalse(precisPublishLimiter.tryAcquire(10, 101)); - Thread.sleep(1100); + Thread.sleep(2100); // tryAcquire exceeded exactly assertFalse(precisPublishLimiter.tryAcquire(10, 100)); + Thread.sleep(2100); // tryAcquire not exceeded - assertFalse(precisPublishLimiter.tryAcquire(9, 99)); + assertTrue(precisPublishLimiter.tryAcquire(9, 99)); } } From e1f93510b0fa4cfa08e7977fce1142f2b8a90be7 Mon Sep 17 00:00:00 2001 From: danielsinai Date: Mon, 26 Jul 2021 07:02:35 -0400 Subject: [PATCH 5/7] using reflection to solve private access to variables and not rely on Thread.sleep --- .../service/PublishRateLimiterTest.java | 68 +++++++++++++++++-- 1 file changed, 61 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index af0602cdfa012..acb5e571adb22 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -20,10 +20,18 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.util.RateLimitFunction; +import org.apache.pulsar.common.util.RateLimiter; +import org.apache.pulsar.utils.StatsOutputStream; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.HashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -37,14 +45,48 @@ public class PublishRateLimiterTest { private PrecisPublishLimiter precisPublishLimiter; private PublishRateLimiterImpl publishRateLimiter; - + private RateLimiter topicPublishRateLimiterOnMessage; + private RateLimiter topicPublishRateLimiterOnByte; + private Method renewTopicPublishRateLimiterOnMessageMethod; + private Method renewTopicPublishRateLimiterOnByteMethod; + private ScheduledFuture onMessageRenewTask; + private ScheduledFuture onByteRenewTask; @BeforeMethod public void setup() throws Exception { policies.publishMaxMessageRate = new HashMap<>(); policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate); - precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, - () -> System.out.print("Refresh permit")); + + Class precisPublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter"); + Constructor constructor = precisPublishLimiterClass.getConstructor(Policies.class, String.class, RateLimitFunction.class); + + Field topicPublishRateLimiterOnMessageField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage"); + Field topicPublishRateLimiterOnByteField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte"); + topicPublishRateLimiterOnMessageField.setAccessible(true); + topicPublishRateLimiterOnByteField.setAccessible(true); + + precisPublishLimiter = (PrecisPublishLimiter) constructor.newInstance(policies, CLUSTER_NAME, (RateLimitFunction) () -> System.out.print("Refresh permit")); + topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter); + topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter); + + renewTopicPublishRateLimiterOnMessageMethod = topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null); + renewTopicPublishRateLimiterOnByteMethod = topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null); + renewTopicPublishRateLimiterOnMessageMethod.setAccessible(true); + renewTopicPublishRateLimiterOnByteMethod.setAccessible(true); + + // running tryAcquire in order to lazyInit the renewTask + assertTrue(precisPublishLimiter.tryAcquire(1, 10)); + + Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask"); + Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask"); + onMessageRenewTaskField.setAccessible(true); + onByteRenewTaskField.setAccessible(true); + onMessageRenewTask = (ScheduledFuture) onMessageRenewTaskField.get(topicPublishRateLimiterOnMessage); + onByteRenewTask = (ScheduledFuture) onByteRenewTaskField.get(topicPublishRateLimiterOnByte); + + onMessageRenewTask.cancel(false); + onByteRenewTask.cancel(false); + publishRateLimiter = new PublishRateLimiterImpl(policies, CLUSTER_NAME); } @@ -94,21 +136,33 @@ public void testPrecisePublishRateLimiterUpdate() { @Test public void testPrecisePublishRateLimiterAcquire() throws Exception { + // renewing the permits from previous tests + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + // tryAcquire not exceeded assertTrue(precisPublishLimiter.tryAcquire(1, 10)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire numOfMessages exceeded assertFalse(precisPublishLimiter.tryAcquire(11, 100)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire msgSizeInBytes exceeded assertFalse(precisPublishLimiter.tryAcquire(10, 101)); - Thread.sleep(2100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire exceeded exactly assertFalse(precisPublishLimiter.tryAcquire(10, 100)); - Thread.sleep(2100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire not exceeded assertTrue(precisPublishLimiter.tryAcquire(9, 99)); From c0b316d25930729753d431efb445a6cfbca5f875 Mon Sep 17 00:00:00 2001 From: danielsinai Date: Mon, 26 Jul 2021 07:12:01 -0400 Subject: [PATCH 6/7] removed assertTrue from setup --- .../apache/pulsar/broker/service/PublishRateLimiterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index acb5e571adb22..ee3a6eaaeda00 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -75,7 +75,7 @@ public void setup() throws Exception { renewTopicPublishRateLimiterOnByteMethod.setAccessible(true); // running tryAcquire in order to lazyInit the renewTask - assertTrue(precisPublishLimiter.tryAcquire(1, 10)); + precisPublishLimiter.tryAcquire(1, 10); Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask"); Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask"); From 740381b46850654bf4fc312fd7c6f1f941995835 Mon Sep 17 00:00:00 2001 From: danielsinai Date: Wed, 28 Jul 2021 01:51:20 -0400 Subject: [PATCH 7/7] moved the setup to the tests in order to no affect other tests with the reflection --- .../service/PublishRateLimiterTest.java | 65 +++++++++---------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index ee3a6eaaeda00..9131d5173433a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -20,6 +20,7 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.RateLimitFunction; import org.apache.pulsar.common.util.RateLimiter; import org.apache.pulsar.utils.StatsOutputStream; @@ -45,48 +46,13 @@ public class PublishRateLimiterTest { private PrecisPublishLimiter precisPublishLimiter; private PublishRateLimiterImpl publishRateLimiter; - private RateLimiter topicPublishRateLimiterOnMessage; - private RateLimiter topicPublishRateLimiterOnByte; - private Method renewTopicPublishRateLimiterOnMessageMethod; - private Method renewTopicPublishRateLimiterOnByteMethod; - private ScheduledFuture onMessageRenewTask; - private ScheduledFuture onByteRenewTask; @BeforeMethod public void setup() throws Exception { policies.publishMaxMessageRate = new HashMap<>(); policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate); - Class precisPublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter"); - Constructor constructor = precisPublishLimiterClass.getConstructor(Policies.class, String.class, RateLimitFunction.class); - - Field topicPublishRateLimiterOnMessageField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage"); - Field topicPublishRateLimiterOnByteField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte"); - topicPublishRateLimiterOnMessageField.setAccessible(true); - topicPublishRateLimiterOnByteField.setAccessible(true); - - precisPublishLimiter = (PrecisPublishLimiter) constructor.newInstance(policies, CLUSTER_NAME, (RateLimitFunction) () -> System.out.print("Refresh permit")); - topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter); - topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter); - - renewTopicPublishRateLimiterOnMessageMethod = topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null); - renewTopicPublishRateLimiterOnByteMethod = topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null); - renewTopicPublishRateLimiterOnMessageMethod.setAccessible(true); - renewTopicPublishRateLimiterOnByteMethod.setAccessible(true); - - // running tryAcquire in order to lazyInit the renewTask - precisPublishLimiter.tryAcquire(1, 10); - - Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask"); - Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask"); - onMessageRenewTaskField.setAccessible(true); - onByteRenewTaskField.setAccessible(true); - onMessageRenewTask = (ScheduledFuture) onMessageRenewTaskField.get(topicPublishRateLimiterOnMessage); - onByteRenewTask = (ScheduledFuture) onByteRenewTaskField.get(topicPublishRateLimiterOnByte); - - onMessageRenewTask.cancel(false); - onByteRenewTask.cancel(false); - + precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, () -> System.out.print("Refresh permit")); publishRateLimiter = new PublishRateLimiterImpl(policies, CLUSTER_NAME); } @@ -136,6 +102,33 @@ public void testPrecisePublishRateLimiterUpdate() { @Test public void testPrecisePublishRateLimiterAcquire() throws Exception { + Class precisPublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter"); + Field topicPublishRateLimiterOnMessageField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage"); + Field topicPublishRateLimiterOnByteField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte"); + topicPublishRateLimiterOnMessageField.setAccessible(true); + topicPublishRateLimiterOnByteField.setAccessible(true); + + RateLimiter topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter); + RateLimiter topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter); + + Method renewTopicPublishRateLimiterOnMessageMethod = topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null); + Method renewTopicPublishRateLimiterOnByteMethod = topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null); + renewTopicPublishRateLimiterOnMessageMethod.setAccessible(true); + renewTopicPublishRateLimiterOnByteMethod.setAccessible(true); + + // running tryAcquire in order to lazyInit the renewTask + precisPublishLimiter.tryAcquire(1, 10); + + Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask"); + Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask"); + onMessageRenewTaskField.setAccessible(true); + onByteRenewTaskField.setAccessible(true); + ScheduledFuture onMessageRenewTask = (ScheduledFuture) onMessageRenewTaskField.get(topicPublishRateLimiterOnMessage); + ScheduledFuture onByteRenewTask = (ScheduledFuture) onByteRenewTaskField.get(topicPublishRateLimiterOnByte); + + onMessageRenewTask.cancel(false); + onByteRenewTask.cancel(false); + // renewing the permits from previous tests renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte);