From bc887d8d35a8055f79d31a8c08f4d234c7352b77 Mon Sep 17 00:00:00 2001 From: Daniel Sinai <51213812+danielsinai@users.noreply.github.com> Date: Wed, 28 Jul 2021 10:32:03 +0300 Subject: [PATCH] [issue #13351] Solving precise rate limiting does not takes effect (#11446) ![image](https://user-images.githubusercontent.com/51213812/126812923-91bb827c-246d-451d-8f25-343bb2c1dca0.png) befoe this PR precise publish rate limiting wasn't taking effect at all In order to solve the current problems, there are 2 modifications 1. Using IsDispatchRateLimiting in precise publish rate limiter as well (in order to starve the producer) 2. Checking if there are available permits before resetting the read from the connection again Already covered by current tests. (cherry picked from commit 00ad07d7fdad5dadc378235a2f5e7edd354d8ff7) --- .../broker/service/PrecisPublishLimiter.java | 5 +- .../service/PublishRateLimiterDisable.java | 2 +- .../service/PublishRateLimiterTest.java | 65 +++++++++++++++++-- .../pulsar/common/util/RateLimiter.java | 29 +++++++-- 4 files changed, 84 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java index a555104b828e0..e306757f21fe5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PrecisPublishLimiter.java @@ -79,10 +79,11 @@ public void update(PublishRate maxPublishRate) { this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0); this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0); if (this.publishMaxMessageRate > 0) { - topicPublishRateLimiterOnMessage = new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction); + topicPublishRateLimiterOnMessage = + new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true); } if (this.publishMaxByteRate > 0) { - topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS); + topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true); } } else { this.publishMaxMessageRate = 0; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java index 0ff3866a8a891..c72f6ba82b05d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterDisable.java @@ -59,7 +59,7 @@ public void update(PublishRate maxPublishRate) { @Override public boolean tryAcquire(int numbers, long bytes) { // No-op - return false; + return true; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java index fd88bec8e781f..f2709d926edef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java @@ -20,10 +20,19 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.PublishRate; +import org.apache.pulsar.common.stats.Rate; +import org.apache.pulsar.common.util.RateLimitFunction; +import org.apache.pulsar.common.util.RateLimiter; +import org.apache.pulsar.utils.StatsOutputStream; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.HashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @@ -38,13 +47,12 @@ public class PublishRateLimiterTest { private PrecisPublishLimiter precisPublishLimiter; private PublishRateLimiterImpl publishRateLimiter; - @BeforeMethod public void setup() throws Exception { policies.publishMaxMessageRate = new HashMap<>(); policies.publishMaxMessageRate.put(CLUSTER_NAME, publishRate); - precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, - () -> System.out.print("Refresh permit")); + + precisPublishLimiter = new PrecisPublishLimiter(policies, CLUSTER_NAME, () -> System.out.print("Refresh permit")); publishRateLimiter = new PublishRateLimiterImpl(policies, CLUSTER_NAME); } @@ -94,19 +102,62 @@ public void testPrecisePublishRateLimiterUpdate() throws Exception { @Test public void testPrecisePublishRateLimiterAcquire() throws Exception { + Class precisPublishLimiterClass = Class.forName("org.apache.pulsar.broker.service.PrecisPublishLimiter"); + Field topicPublishRateLimiterOnMessageField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnMessage"); + Field topicPublishRateLimiterOnByteField = precisPublishLimiterClass.getDeclaredField("topicPublishRateLimiterOnByte"); + topicPublishRateLimiterOnMessageField.setAccessible(true); + topicPublishRateLimiterOnByteField.setAccessible(true); + + RateLimiter topicPublishRateLimiterOnMessage = (RateLimiter)topicPublishRateLimiterOnMessageField.get(precisPublishLimiter); + RateLimiter topicPublishRateLimiterOnByte = (RateLimiter)topicPublishRateLimiterOnByteField.get(precisPublishLimiter); + + Method renewTopicPublishRateLimiterOnMessageMethod = topicPublishRateLimiterOnMessage.getClass().getDeclaredMethod("renew", null); + Method renewTopicPublishRateLimiterOnByteMethod = topicPublishRateLimiterOnByte.getClass().getDeclaredMethod("renew", null); + renewTopicPublishRateLimiterOnMessageMethod.setAccessible(true); + renewTopicPublishRateLimiterOnByteMethod.setAccessible(true); + + // running tryAcquire in order to lazyInit the renewTask + precisPublishLimiter.tryAcquire(1, 10); + + Field onMessageRenewTaskField = topicPublishRateLimiterOnMessage.getClass().getDeclaredField("renewTask"); + Field onByteRenewTaskField = topicPublishRateLimiterOnByte.getClass().getDeclaredField("renewTask"); + onMessageRenewTaskField.setAccessible(true); + onByteRenewTaskField.setAccessible(true); + ScheduledFuture onMessageRenewTask = (ScheduledFuture) onMessageRenewTaskField.get(topicPublishRateLimiterOnMessage); + ScheduledFuture onByteRenewTask = (ScheduledFuture) onByteRenewTaskField.get(topicPublishRateLimiterOnByte); + + onMessageRenewTask.cancel(false); + onByteRenewTask.cancel(false); + + // renewing the permits from previous tests + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + // tryAcquire not exceeded assertTrue(precisPublishLimiter.tryAcquire(1, 10)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire numOfMessages exceeded assertFalse(precisPublishLimiter.tryAcquire(11, 100)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire msgSizeInBytes exceeded assertFalse(precisPublishLimiter.tryAcquire(10, 101)); - Thread.sleep(1100); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + + // tryAcquire exceeded exactly + assertFalse(precisPublishLimiter.tryAcquire(10, 100)); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); + renewTopicPublishRateLimiterOnMessageMethod.invoke(topicPublishRateLimiterOnMessage); + renewTopicPublishRateLimiterOnByteMethod.invoke(topicPublishRateLimiterOnByte); // tryAcquire not exceeded - assertTrue(precisPublishLimiter.tryAcquire(10, 100)); + assertTrue(precisPublishLimiter.tryAcquire(9, 99)); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java index 0ec98aca83128..7e37f4adc0960 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RateLimiter.java @@ -62,15 +62,25 @@ public class RateLimiter implements AutoCloseable{ // permitUpdate helps to update permit-rate at runtime private Supplier permitUpdater; private RateLimitFunction rateLimitFunction; - private boolean isDispatchRateLimiter; + private boolean isDispatchOrPrecisePublishRateLimiter; public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) { this(null, permits, rateTime, timeUnit, null); } + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) { + this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); + } + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, RateLimitFunction autoReadResetFunction) { - this(null, permits, rateTime, timeUnit, null); + this(null, permits, rateTime, timeUnit, null, false); + this.rateLimitFunction = autoReadResetFunction; + } + + public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, + RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) { + this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter); this.rateLimitFunction = autoReadResetFunction; } @@ -80,7 +90,7 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f } public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime, - final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchRateLimiter) { + final TimeUnit timeUnit, Supplier permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) { checkArgument(permits > 0, "rate must be > 0"); checkArgument(rateTime > 0, "Renew permit time must be > 0"); @@ -88,7 +98,7 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f this.timeUnit = timeUnit; this.permits = permits; this.permitUpdater = permitUpdater; - this.isDispatchRateLimiter = isDispatchRateLimiter; + this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter; if (service != null) { this.executorService = service; @@ -182,9 +192,13 @@ public synchronized boolean tryAcquire(long acquirePermit) { } boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; - if (isDispatchRateLimiter) { + if (isDispatchOrPrecisePublishRateLimiter) { // for dispatch rate limiter just add acquirePermit acquiredPermits += acquirePermit; + + // we want to back-pressure from the current state of the rateLimiter therefore we should check if there + // are any available premits again + canAcquire = acquirePermit < 0 || acquiredPermits < this.permits; } else { // acquired-permits can't be larger than the rate if (acquirePermit > this.permits) { @@ -259,14 +273,15 @@ protected ScheduledFuture createTask() { } synchronized void renew() { - acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; + acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0, acquiredPermits - permits) : 0; if (permitUpdater != null) { long newPermitRate = permitUpdater.get(); if (newPermitRate > 0) { setRate(newPermitRate); } } - if (rateLimitFunction != null) { + // release the back-pressure by applying the rateLimitFunction only when there are available permits + if (rateLimitFunction != null && this.getAvailablePermits() > 0) { rateLimitFunction.apply(); } notifyAll();