Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issue #13351] Solving precise rate limiting does not takes effect #11446

Merged
merged 7 commits into from Jul 28, 2021
Expand Up @@ -79,10 +79,10 @@ public void update(PublishRate maxPublishRate) {
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction, true);
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS, true);
}
} else {
this.publishMaxMessageRate = 0;
Expand Down
Expand Up @@ -59,7 +59,7 @@ public void update(PublishRate maxPublishRate) {
@Override
public boolean tryAcquire(int numbers, long bytes) {
// No-op
return false;
return true;
}

}
Expand Up @@ -60,15 +60,25 @@ public class RateLimiter implements AutoCloseable{
// permitUpdate helps to update permit-rate at runtime
private Supplier<Long> permitUpdater;
private RateLimitFunction rateLimitFunction;
private boolean isDispatchRateLimiter;
private boolean isDispatchOrPrecisePublishRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit, null);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction) {
this(null, permits, rateTime, timeUnit, null);
this(null, permits, rateTime, timeUnit, null, false);
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction, boolean isDispatchOrPrecisePublishRateLimiter) {
this(null, permits, rateTime, timeUnit, null, isDispatchOrPrecisePublishRateLimiter);
this.rateLimitFunction = autoReadResetFunction;
}

Expand All @@ -78,15 +88,15 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchRateLimiter) {
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");

this.rateTime = rateTime;
this.timeUnit = timeUnit;
this.permits = permits;
this.permitUpdater = permitUpdater;
this.isDispatchRateLimiter = isDispatchRateLimiter;
this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter;

if (service != null) {
this.executorService = service;
Expand Down Expand Up @@ -180,7 +190,7 @@ public synchronized boolean tryAcquire(long acquirePermit) {
}

boolean canAcquire = acquirePermit < 0 || acquiredPermits < this.permits;
if (isDispatchRateLimiter) {
if (isDispatchOrPrecisePublishRateLimiter) {
// for dispatch rate limiter just add acquirePermit
acquiredPermits += acquirePermit;
} else {
Expand Down Expand Up @@ -257,14 +267,15 @@ protected ScheduledFuture<?> createTask() {
}

synchronized void renew() {
acquiredPermits = isDispatchRateLimiter ? Math.max(0, acquiredPermits - permits) : 0;
acquiredPermits = isDispatchOrPrecisePublishRateLimiter ? Math.max(0, acquiredPermits - permits) : 0;
if (permitUpdater != null) {
long newPermitRate = permitUpdater.get();
if (newPermitRate > 0) {
setRate(newPermitRate);
}
}
if (rateLimitFunction != null) {
// release the back-pressure by applying the rateLimitFunction only when there are available permits
if (rateLimitFunction != null && this.getAvailablePermits() > 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to make sense. Please add some comments here explaining the logic.

Copy link
Member

@lhotari lhotari Jul 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd assume that the condition would need to handle the case where isDispatchOrPrecisePublishRateLimiter is false.
It seems that rateLimitFunction.apply would never be called in that case.
Did you think about that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea Ill add a comment.

And actually i didn't give it much thought but when I'm thinking about it the rateLimitFunction is a callback that lets the outer scope access to the renew function, I don't think that we should use this property without knowing exactly what expected to be happening.

I believe that checking whether there are available permits is the reasonable condition here because we would want to let the outer scope a way to run something when there are any available permits, and it doesn't really depend on the class property.

If it sets to false we can assume the user of the rateLimiter wants the state to be reset every time window otherwise he probably want back-pressuring something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is necessary
Changed existing behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is necessary, without it every renew call the function will release the throttle.

Not having it defeats the whole purpose of throttling a connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is that still relevant?
And may you review the test refactor please?

rateLimitFunction.apply();
}
notifyAll();
Expand Down