Skip to content

Commit

Permalink
Remove RateLimiter constructors and replace with builder usage (#11599)
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Aug 9, 2021
1 parent a6e66dd commit fcd3336
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 65 deletions.
Expand Up @@ -96,13 +96,22 @@ public void update(ResourceGroup resourceGroup) {
this.publishMaxByteRate = Math.max(resourceGroup.getPublishRateInBytes(), 0);
if (this.publishMaxMessageRate > 0) {
// TODO: pass the executor
publishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, this::apply);
publishRateLimiterOnMessage = RateLimiter.builder()
.permits(publishMaxMessageRate)
.rateTime(1L)
.timeUnit(TimeUnit.SECONDS)
.rateLimitFunction(this::apply)
.build();
}
if (this.publishMaxByteRate > 0) {
// TODO: pass the executor
publishRateLimiterOnByte =
new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, this::apply);
RateLimiter.builder()
.permits(publishMaxByteRate)
.rateTime(1L)
.timeUnit(TimeUnit.SECONDS)
.rateLimitFunction(this::apply)
.build();
}
} else {
this.publishMaxMessageRate = 0;
Expand Down
Expand Up @@ -358,8 +358,15 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
// update msg-rateLimiter
if (msgRate > 0) {
if (this.dispatchRateLimiterOnMessage == null) {
this.dispatchRateLimiterOnMessage = new RateLimiter(brokerService.pulsar().getExecutor(), msgRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterMsg, true);
this.dispatchRateLimiterOnMessage =
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(msgRate)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdaterMsg)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
} else {
this.dispatchRateLimiterOnMessage.setRate(msgRate, dispatchRate.getRatePeriodInSecond(),
TimeUnit.SECONDS, permitUpdaterMsg);
Expand All @@ -378,8 +385,15 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
// update byte-rateLimiter
if (byteRate > 0) {
if (this.dispatchRateLimiterOnByte == null) {
this.dispatchRateLimiterOnByte = new RateLimiter(brokerService.pulsar().getExecutor(), byteRate,
ratePeriod, TimeUnit.SECONDS, permitUpdaterByte, true);
this.dispatchRateLimiterOnByte =
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(byteRate)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdaterByte)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
} else {
this.dispatchRateLimiterOnByte.setRate(byteRate, dispatchRate.getRatePeriodInSecond(),
TimeUnit.SECONDS, permitUpdaterByte);
Expand Down
Expand Up @@ -139,8 +139,12 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
if (ratePerConsumer > 0) {
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier,
new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
ratePeriod, TimeUnit.SECONDS));
RateLimiter.builder()
.scheduledExecutorService(brokerService.pulsar().getExecutor())
.permits(ratePerConsumer)
.rateTime(ratePeriod)
.timeUnit(TimeUnit.SECONDS)
.build());
} else {
this.subscribeRateLimiter.get(consumerIdentifier)
.setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
Expand Down
Expand Up @@ -62,42 +62,6 @@ public class RateLimiter implements AutoCloseable{
private RateLimitFunction rateLimitFunction;
private boolean isDispatchOrPrecisePublishRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction) {
this(null, permits, rateTime, timeUnit, null, false);
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit) {
this(service, permits, rateTime, timeUnit, (Supplier<Long>) null);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
this(service, permits, rateTime, timeUnit, permitUpdater, false);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) {
this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter,
null);
}

@Builder
RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter,
Expand Down
Expand Up @@ -33,14 +33,14 @@ public class RateLimiterTest {
@Test
public void testInvalidRenewTime() {
try {
new RateLimiter(0, 100, TimeUnit.SECONDS);
RateLimiter.builder().permits(0).rateTime(100).timeUnit(TimeUnit.SECONDS).build();
fail("should have thrown exception: invalid rate, must be > 0");
} catch (IllegalArgumentException ie) {
// Ok
}

try {
new RateLimiter(10, 0, TimeUnit.SECONDS);
RateLimiter.builder().permits(10).rateTime(0).timeUnit(TimeUnit.SECONDS).build();
fail("should have thrown exception: invalid rateTime, must be > 0");
} catch (IllegalArgumentException ie) {
// Ok
Expand All @@ -49,7 +49,7 @@ public void testInvalidRenewTime() {

@Test
public void testClose() throws Exception {
RateLimiter rate = new RateLimiter(1, 1000, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(1).rateTime(1000).timeUnit(TimeUnit.MILLISECONDS).build();
assertFalse(rate.isClosed());
rate.close();
assertTrue(rate.isClosed());
Expand All @@ -64,7 +64,8 @@ public void testClose() throws Exception {
@Test
public void testAcquireBlock() throws Exception {
final long rateTimeMSec = 1000;
RateLimiter rate = new RateLimiter(1, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(1).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
rate.acquire();
assertEquals(rate.getAvailablePermits(), 0);
long start = System.currentTimeMillis();
Expand All @@ -79,7 +80,8 @@ public void testAcquireBlock() throws Exception {
public void testAcquire() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
long start = System.currentTimeMillis();
for (int i = 0; i < permits; i++) {
rate.acquire();
Expand All @@ -95,7 +97,8 @@ public void testMultipleAcquire() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
final int acquirePermits = 50;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
long start = System.currentTimeMillis();
for (int i = 0; i < permits / acquirePermits; i++) {
rate.acquire(acquirePermits);
Expand All @@ -109,7 +112,8 @@ public void testMultipleAcquire() throws Exception {
@Test
public void testTryAcquireNoPermits() {
final long rateTimeMSec = 1000;
RateLimiter rate = new RateLimiter(1, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(1).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
assertTrue(rate.tryAcquire());
assertFalse(rate.tryAcquire());
assertEquals(rate.getAvailablePermits(), 0);
Expand All @@ -120,7 +124,8 @@ public void testTryAcquireNoPermits() {
public void testTryAcquire() {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
for (int i = 0; i < permits; i++) {
rate.tryAcquire();
}
Expand All @@ -133,7 +138,8 @@ public void testMultipleTryAcquire() {
final long rateTimeMSec = 1000;
final int permits = 100;
final int acquirePermits = 50;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
for (int i = 0; i < permits / acquirePermits; i++) {
rate.tryAcquire(acquirePermits);
}
Expand All @@ -145,7 +151,8 @@ public void testMultipleTryAcquire() {
public void testResetRate() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(permits, rateTimeMSec, TimeUnit.MILLISECONDS);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.build();
rate.tryAcquire(permits);
assertEquals(rate.getAvailablePermits(), 0);
// check after a rate-time: permits must be renewed
Expand All @@ -168,7 +175,9 @@ public void testResetRate() throws Exception {
public void testDispatchRate() throws Exception {
final long rateTimeMSec = 1000;
final int permits = 100;
RateLimiter rate = new RateLimiter(null, permits, rateTimeMSec, TimeUnit.MILLISECONDS, null, true);
RateLimiter rate = RateLimiter.builder().permits(permits).rateTime(rateTimeMSec).timeUnit(TimeUnit.MILLISECONDS)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
rate.tryAcquire(100);
rate.tryAcquire(100);
rate.tryAcquire(100);
Expand All @@ -191,7 +200,9 @@ public void testRateLimiterWithPermitUpdater() throws Exception {
long rateTime = 1;
long newUpdatedRateLimit = 100L;
Supplier<Long> permitUpdater = () -> newUpdatedRateLimit;
RateLimiter limiter = new RateLimiter(null, permits, 1, TimeUnit.SECONDS, permitUpdater);
RateLimiter limiter = RateLimiter.builder().permits(permits).rateTime(1).timeUnit(TimeUnit.SECONDS)
.permitUpdater(permitUpdater)
.build();
limiter.acquire();
Thread.sleep(rateTime * 3 * 1000);
assertEquals(limiter.getAvailablePermits(), newUpdatedRateLimit);
Expand All @@ -204,8 +215,10 @@ public void testRateLimiterWithFunction() {
long rateTime = 1;
int reNewTime = 3;
RateLimitFunction rateLimitFunction = atomicInteger::incrementAndGet;
RateLimiter rateLimiter = new RateLimiter(permits, rateTime, TimeUnit.SECONDS, rateLimitFunction);
for (int i = 0 ; i < reNewTime; i++) {
RateLimiter rateLimiter = RateLimiter.builder().permits(permits).rateTime(rateTime).timeUnit(TimeUnit.SECONDS)
.rateLimitFunction(rateLimitFunction)
.build();
for (int i = 0; i < reNewTime; i++) {
rateLimiter.renew();
}
assertEquals(reNewTime, atomicInteger.get());
Expand Down
Expand Up @@ -262,8 +262,18 @@ public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry,
.help("Exception from sink.")
.create());

userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
userExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
sysExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
}

public void addUserException(Throwable ex) {
Expand Down
Expand Up @@ -196,8 +196,18 @@ public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] me
.help("Exception from sink.")
.create());

sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
sinkExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
}

@Override
Expand Down
Expand Up @@ -195,8 +195,18 @@ public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[]
.help("Exception from source.")
.create());

sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
sourceExceptionRateLimiter = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(5)
.rateTime(1)
.timeUnit(TimeUnit.MINUTES)
.build();
}

@Override
Expand Down

0 comments on commit fcd3336

Please sign in to comment.