From 06c6adf6faa83c4fbaef95ff792299c8c0744817 Mon Sep 17 00:00:00 2001 From: Daniel Sinai <51213812+danielsinai@users.noreply.github.com> Date: Wed, 28 Jul 2021 10:32:03 +0300 Subject: [PATCH] [issue #13351] Solving precise rate limiting does not takes effect (#11446) ![image](https://user-images.githubusercontent.com/51213812/126812923-91bb827c-246d-451d-8f25-343bb2c1dca0.png) befoe this PR precise publish rate limiting wasn't taking effect at all ### Modifications In order to solve the current problems, there are 2 modifications 1. Using IsDispatchRateLimiting in precise publish rate limiter as well (in order to starve the producer) 2. Checking if there are available permits before resetting the read from the connection again ### Verifying this change Already covered by current tests. (cherry picked from commit 00ad07d7fdad5dadc378235a2f5e7edd354d8ff7) --- .../broker/service/PrecisPublishLimiter.java | 4 +- .../service/PublishRateLimiterDisable.java | 2 +- .../service/PublishRateLimiterTest.java | 65 +++++++++++++++++-- .../pulsar/common/util/RateLimiter.java | 29 +++++++-- 4 files changed, 83 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index 4db6bf230865e..e981518886eac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -79,10 +79,10 @@ public void update(PublishRate maxPublishRate) { this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); if (this.publishMaxMessageRate > 0) { topicPublishRateLimiterOnMessage = - new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction); + new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true); } if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS); + topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true); } } else { this.publishMaxMessageRate = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java index 0ff3866a8a891..c72f6ba82b05d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -59,7 +59,7 @@ public void update(PublishRate maxPublishRate) { @Override public boolean tryAcquire(int numbers, long bytes) { // No-op - return false; + return true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index b820c606b9d7f..9131d5173433a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -20,10 +20,19 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.stats.Rate; +import org.apache.pulsar.common.util.RateLimitFunction; +import org.apache.pulsar.common.util.RateLimiter; +import org.apache.pulsar.utils.StatsOutputStream; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.HashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -38,13 +47,12 @@ public class PublishRateLimiterTest { private PrecisPublishLimiter precisPublishLimiter; private PublishRateLimiterImpl publishRateLimiter; - @BeforeMethod public void setup() throws Exception { policies.publishMaxMessageRate = new HashMap<>(); policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate); - precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, - () -> System.out.print("Refresh permit")); + + precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, () -> System.out.print("Refresh permit")); publishRateLimiter = new PublishRateLimiterImpl(policies, CLUSTER_NAME); } @@ -94,19 +102,62 @@ public void testPrecisePublishRateLimiterUpdate() { @Test public void testPrecisePublishRateLimiterAcquire() throws Exception { + Class precisPublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter"); + Field topicPublishRateLimiterOnMessageField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage"); + Field topicPublishRateLimiterOnByteField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte"); + topicPublishRateLimiterOnMessageField.setAccessible(true); + topicPublishRateLimiterOnByteField.setAccessible(true); + + RateLimiter topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter); + RateLimiter topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter); + + Method renewTopicPublishRateLimiterOnMessageMethod = topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null); + Method renewTopicPublishRateLimiterOnByteMethod = topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null); + renewTopicPublishRateLimiterOnMessageMethod.setAccessible(true); + renewTopicPublishRateLimiterOnByteMethod.setAccessible(true); + + // running tryAcquire in order to lazyInit the renewTask + precisPublishLimiter.tryAcquire(1, 10); + + Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask"); + Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask"); + onMessageRenewTaskField.setAccessible(true); + onByteRenewTaskField.setAccessible(true); + ScheduledFuture onMessageRenewTask = (ScheduledFuture) onMessageRenewTaskField.get(topicPublishRateLimiterOnMessage); + ScheduledFuture onByteRenewTask = (ScheduledFuture) onByteRenewTaskField.get(topicPublishRateLimiterOnByte); + + onMessageRenewTask.cancel(false); + onByteRenewTask.cancel(false); + + // renewing the permits from previous tests + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + // tryAcquire not exceeded assertTrue(precisPublishLimiter.tryAcquire(1, 10)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire numOfMessages exceeded assertFalse(precisPublishLimiter.tryAcquire(11, 100)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire msgSizeInBytes exceeded assertFalse(precisPublishLimiter.tryAcquire(10, 101)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + + // tryAcquire exceeded exactly + assertFalse(precisPublishLimiter.tryAcquire(10, 100)); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire not exceeded - assertTrue(precisPublishLimiter.tryAcquire(10, 100)); + assertTrue(precisPublishLimiter.tryAcquire(9, 99)); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index bd31853062774..edaef1719ac94 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -60,15 +60,25 @@ public class RateLimiter implements AutoCloseable{ // permitUpdate helps to update permit-rate at runtime private Supplier permitUpdater; private RateLimitFunction rateLimitFunction; - private boolean isDispatchRateLimiter; + private boolean isDispatchOrPrecisePublishRateLimiter; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { this(null, permits, rateTime, timeUnit, null); } + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) { + this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); + } + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, RateLimitFunction autoReadResetFunction) { - this(null, permits, rateTime, timeUnit, null); + this(null, permits, rateTime, timeUnit, null, false); + this.rateLimitFunction = autoReadResetFunction; + } + + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, + RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) { + this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); this.rateLimitFunction = autoReadResetFunction; } @@ -78,7 +88,7 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchRateLimiter) { + final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); @@ -86,7 +96,7 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this.timeUnit = timeUnit; this.permits = permits; this.permitUpdater = permitUpdater; - this.isDispatchRateLimiter = isDispatchRateLimiter; + this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter; if (service != null) { this.executorService = service; @@ -180,9 +190,13 @@ public synchronized boolean tryAcquire(long acquirePermit) { } boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; - if (isDispatchRateLimiter) { + if (isDispatchOrPrecisePublishRateLimiter) { // for dispatch rate limiter just add acquirePermit acquiredPermits += acquirePermit; + + // we want to back-pressure from the current state of the rateLimiter therefore we should check if there + // are any available premits again + canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; } else { // acquired-permits can't be larger than the rate if (acquirePermit > this.permits) { @@ -257,14 +271,15 @@ protected ScheduledFuture createTask() { } synchronized void renew() { - acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; + acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; if (permitUpdater != null) { long newPermitRate = permitUpdater.get(); if (newPermitRate > 0) { setRate(newPermitRate); } } - if (rateLimitFunction != null) { + // release the back-pressure by applying the rateLimitFunction only when there are available permits + if (rateLimitFunction != null && this.getAvailablePermits() > 0) { rateLimitFunction.apply(); } notifyAll();