Skip to content

Commit

Permalink
Remove RateLimiter constructors and replace with builder usage (#11599)
Browse files Browse the repository at this point in the history
(cherry picked from commit fcd3336)
  • Loading branch information
lhotari authored and michaeljmarshall committed Dec 11, 2021
1 parent 949a1eb commit 0c48d23
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 62 deletions.
Expand Up @@ -333,8 +333,15 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
// update msg-rateLimiter
if (msgRate > 0) {
if (this.dispatchRateLimiterOnMessage == null) {
this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg, true);
this.dispatchRateLimiterOnMessage =
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(msgRate)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdaterMsg)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
} else {
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS, permitUpdaterMsg);
Expand All @@ -353,8 +360,15 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
// update byte-rateLimiter
if (byteRate > 0) {
if (this.dispatchRateLimiterOnByte == null) {
this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte, true);
this.dispatchRateLimiterOnByte =
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(byteRate)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdaterByte)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
} else {
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.ratePeriodInSecond,
TimeUnit.SECONDS, permitUpdaterByte);
Expand Down
Expand Up @@ -135,8 +135,12 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
if (ratePerConsumer > 0) {
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier,
new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
ratePeriod, TimeUnit.SECONDS));
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(ratePerConsumer)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.build());
} else {
this.subscribeRateLimiter.get(consumerIdentifier).setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
null);
Expand Down
Expand Up @@ -64,42 +64,6 @@ public class RateLimiter implements AutoCloseable{
private RateLimitFunction rateLimitFunction;
private boolean isDispatchOrPrecisePublishRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction) {
this(null, permits, rateTime, timeUnit, null, false);
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit) {
this(service, permits, rateTime, timeUnit, (Supplier<Long>) null);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
this(service, permits, rateTime, timeUnit, permitUpdater, false);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) {
this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter,
null);
}

@Builder
RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter,
Expand Down
Expand Up @@ -33,14 +33,14 @@ public class RateLimiterTest {
@Test
public void testInvalidRenewTime() {
try {
new RateLimiter(0, 100, TimeUnit.SECONDS);
RateLimiter.builder().permits(0).rateTime(100).timeUnit(TimeUnit.SECONDS).build();
fail("should have thrown exception: invalid rate, must be > 0");
} catch (IllegalArgumentException ie) {
// Ok
}

try {
new RateLimiter(10, 0, TimeUnit.SECONDS);
RateLimiter.builder().permits(10).rateTime(0).timeUnit(TimeUnit.SECONDS).build();
fail("should have thrown exception: invalid rateTime, must be > 0");
} catch (IllegalArgumentException ie) {
// Ok
Expand All @@ -49,7 +49,7 @@ public void testInvalidRenewTime() {

@Test
public void testClose() throws Exception {
RateLimiter rate = new RateLimiter(1, 1000, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(1).rateTime(1000).timeUnit(TimeUnit.MILLISECONDS).build();
assertFalse(rate.isClosed());
rate.close();
assertTrue(rate.isClosed());
Expand All @@ -64,7 +64,8 @@ public void testClose() throws Exception {
@Test
public void testAcquireBlock() throws Exception {
final long rateTimeMSec = 1000;
RateLimiter rate = new RateLimiter(1, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(1).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
rate.acquire();
assertEquals(rate.getAvailablePermits(), 0);
long start = System.currentTimeMillis();
Expand All @@ -79,7 +80,8 @@ public void testAcquireBlock() throws Exception {
public void testAcquire() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
long start = System.currentTimeMillis();
for (int i = 0; i < permits; i++) {
rate.acquire();
Expand All @@ -95,7 +97,8 @@ public void testMultipleAcquire() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
final int acquirePermits = 50;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
long start = System.currentTimeMillis();
for (int i = 0; i < permits / acquirePermits; i++) {
rate.acquire(acquirePermits);
Expand All @@ -109,7 +112,8 @@ public void testMultipleAcquire() throws Exception {
@Test
public void testTryAcquireNoPermits() {
final long rateTimeMSec = 1000;
RateLimiter rate = new RateLimiter(1, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(1).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
assertTrue(rate.tryAcquire());
assertFalse(rate.tryAcquire());
assertEquals(rate.getAvailablePermits(), 0);
Expand All @@ -120,7 +124,8 @@ public void testTryAcquireNoPermits() {
public void testTryAcquire() {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
for (int i = 0; i < permits; i++) {
rate.tryAcquire();
}
Expand All @@ -133,7 +138,8 @@ public void testMultipleTryAcquire() {
final long rateTimeMSec = 1000;
final int permits = 100;
final int acquirePermits = 50;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
for (int i = 0; i < permits / acquirePermits; i++) {
rate.tryAcquire(acquirePermits);
}
Expand All @@ -145,7 +151,8 @@ public void testMultipleTryAcquire() {
public void testResetRate() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
rate.tryAcquire(permits);
assertEquals(rate.getAvailablePermits(), 0);
// check after a rate-time: permits must be renewed
Expand All @@ -168,7 +175,9 @@ public void testResetRate() throws Exception {
public void testDispatchRate() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(null, permits, rateTimeMSec, TimeUnit.MILLISECONDS, null, true);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
rate.tryAcquire(100);
rate.tryAcquire(100);
rate.tryAcquire(100);
Expand All @@ -191,7 +200,9 @@ public void testRateLimiterWithPermitUpdater() throws Exception {
long rateTime = 1;
long newUpdatedRateLimit = 100L;
Supplier<Long> permitUpdater = () -> newUpdatedRateLimit;
RateLimiter limiter = new RateLimiter(null, permits, 1, TimeUnit.SECONDS, permitUpdater);
RateLimiter limiter = RateLimiter.builder().permits(permits).rateTime(1).timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdater)
.build();
limiter.acquire();
Thread.sleep(rateTime * 3 * 1000);
assertEquals(limiter.getAvailablePermits(), newUpdatedRateLimit);
Expand All @@ -204,8 +215,10 @@ public void testRateLimiterWithFunction() {
long rateTime = 1;
int reNewTime = 3;
RateLimitFunction rateLimitFunction = atomicInteger::incrementAndGet;
RateLimiter rateLimiter = new RateLimiter(permits, rateTime, TimeUnit.SECONDS, rateLimitFunction);
for (int i = 0 ; i < reNewTime; i++) {
RateLimiter rateLimiter = RateLimiter.builder().permits(permits).rateTime(rateTime).timeUnit(TimeUnit.SECONDS)
.rateLimitFunction(rateLimitFunction)
.build();
for (int i = 0; i < reNewTime; i++) {
rateLimiter.renew();
}
assertEquals(reNewTime, atomicInteger.get());
Expand Down
Expand Up @@ -232,8 +232,18 @@ public FunctionStatsManager(CollectorRegistry collectorRegistry,
.help("Exception from sink.")
.register(collectorRegistry);

userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
userExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
sysExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
}

public void addUserException(Throwable ex) {
Expand Down
Expand Up @@ -174,8 +174,18 @@ public SinkStatsManager(CollectorRegistry collectorRegistry, String[] metricsLab
.help("Exception from sink.")
.register(collectorRegistry);

sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
sinkExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
}

@Override
Expand Down
Expand Up @@ -173,8 +173,18 @@ public SourceStatsManager(CollectorRegistry collectorRegistry, String[] metricsL
.help("Exception from source.")
.register(collectorRegistry);

sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
sourceExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
}

@Override
Expand Down

0 comments on commit 0c48d23

Please sign in to comment.