Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Fix set-publish-rate when using preciseTopicPublishRateLimiterEnable=true #10384

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -929,7 +929,7 @@ protected void updatePublishDispatcher(PublishRate publishRate) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
() -> this.enableCnxAutoRead());
() -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
}
Expand Down
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimitFunction;
Expand All @@ -27,30 +27,37 @@
public class PrecisPublishLimiter implements PublishRateLimiter {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
// precise mode for publish rate limiter
private RateLimiter topicPublishRateLimiterOnMessage;
private RateLimiter topicPublishRateLimiterOnByte;
private volatile RateLimiter topicPublishRateLimiterOnMessage;
private volatile RateLimiter topicPublishRateLimiterOnByte;
private final RateLimitFunction rateLimitFunction;
private final ScheduledExecutorService scheduledExecutorService;

public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(policies, clusterName);
this.scheduledExecutorService = null;
}

public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) {
this(publishRate, rateLimitFunction, null);
}

public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction,
ScheduledExecutorService scheduledExecutorService) {
this.rateLimitFunction = rateLimitFunction;
update(publishRate);
this.scheduledExecutorService = scheduledExecutorService;
}

@Override
public void checkPublishRate() {
// No-op
// No-op
}

@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// No-op
// No-op
}

@Override
Expand All @@ -62,10 +69,15 @@ public boolean resetPublishCount() {
public boolean isPublishRateExceeded() {
return false;
}

// If all rate limiters are not exceeded, re-enable auto read from socket.
private void tryReleaseConnectionThrottle() {
if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
|| (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
if ((currentTopicPublishRateLimiterOnMessage != null
&& currentTopicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
|| (currentTopicPublishRateLimiterOnByte != null
&& currentTopicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
return;
}
this.rateLimitFunction.apply();
Expand All @@ -78,34 +90,73 @@ public void update(Policies policies, String clusterName) {
: null;
this.update(maxPublishRate);
}

public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
&& (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS,
this::tryReleaseConnectionThrottle, true);
replaceLimiters(() -> {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0
|| maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(publishMaxMessageRate)
.rateLimitFunction(this::tryReleaseConnectionThrottle)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = RateLimiter.builder()
.scheduledExecutorService(scheduledExecutorService)
.permits(publishMaxByteRate)
.rateLimitFunction(this::tryReleaseConnectionThrottle)
.isDispatchOrPrecisePublishRateLimiter(true)
.build();
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte =
new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS,
this::tryReleaseConnectionThrottle, true);
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
topicPublishRateLimiterOnMessage = null;
topicPublishRateLimiterOnByte = null;
}
});
}

@Override
public boolean tryAcquire(int numbers, long bytes) {
return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers))
&& (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes));
RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
return (currentTopicPublishRateLimiterOnMessage == null
|| currentTopicPublishRateLimiterOnMessage.tryAcquire(numbers))
&& (currentTopicPublishRateLimiterOnByte == null
|| currentTopicPublishRateLimiterOnByte.tryAcquire(bytes));
}

@Override
public void close() throws Exception {
rateLimitFunction.apply();
replaceLimiters(null);
}

private void replaceLimiters(Runnable updater) {
RateLimiter previousTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
topicPublishRateLimiterOnMessage = null;
RateLimiter previousTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
topicPublishRateLimiterOnByte = null;
try {
if (updater != null) {
updater.run();
}
} finally {
// Close previous limiters to prevent resource leakages.
// Delay closing of previous limiters after new ones are in place so that updating the limiter
// doesn't cause unavailability.
if (previousTopicPublishRateLimiterOnMessage != null) {
previousTopicPublishRateLimiterOnMessage.close();
}
if (previousTopicPublishRateLimiterOnByte != null) {
previousTopicPublishRateLimiterOnByte.close();
}
}
}
}
Expand Up @@ -21,7 +21,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;

public interface PublishRateLimiter {
public interface PublishRateLimiter extends AutoCloseable {

PublishRateLimiter DISABLED_RATE_LIMITER = PublishRateLimiterDisable.DISABLED_RATE_LIMITER;

Expand Down
Expand Up @@ -62,4 +62,8 @@ public boolean tryAcquire(int numbers, long bytes) {
return true;
}

@Override
public void close() throws Exception {
// No-op
}
}
Expand Up @@ -108,4 +108,9 @@ public void update(PublishRate maxPublishRate) {
public boolean tryAcquire(int numbers, long bytes) {
return false;
}

@Override
public void close() throws Exception {
// no-op
}
}
Expand Up @@ -445,6 +445,13 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
Expand Down
Expand Up @@ -1149,6 +1149,13 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
Expand Down
Expand Up @@ -140,7 +140,7 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier,
new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
ratePeriod, TimeUnit.SECONDS, null));
ratePeriod, TimeUnit.SECONDS));
} else {
this.subscribeRateLimiter.get(consumerIdentifier)
.setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
Expand Down
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.testng.annotations.Test;

public class PrecisPublishLimiterTest {

@Test
void shouldResetMsgLimitAfterUpdate() {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
});
precisPublishLimiter.update(new PublishRate(1, 1));
assertFalse(precisPublishLimiter.tryAcquire(99, 99));
precisPublishLimiter.update(new PublishRate(-1, 100));
assertTrue(precisPublishLimiter.tryAcquire(99, 99));
}

@Test
void shouldResetBytesLimitAfterUpdate() {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
});
precisPublishLimiter.update(new PublishRate(1, 1));
assertFalse(precisPublishLimiter.tryAcquire(99, 99));
precisPublishLimiter.update(new PublishRate(100, -1));
assertTrue(precisPublishLimiter.tryAcquire(99, 99));
}

@Test
void shouldCloseResources() throws Exception {
for (int i = 0; i < 20000; i++) {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> {
});
precisPublishLimiter.tryAcquire(99, 99);
precisPublishLimiter.close();
}
}
}
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Builder;

/**
* A Rate Limiter that distributes permits at a configurable rate. Each {@link #acquire()} blocks if necessary until a
Expand All @@ -48,7 +49,6 @@
* </ul>
*/
public class RateLimiter implements AutoCloseable{

private final ScheduledExecutorService executorService;
private long rateTime;
private TimeUnit timeUnit;
Expand All @@ -63,7 +63,7 @@ public class RateLimiter implements AutoCloseable{
private boolean isDispatchOrPrecisePublishRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit, null);
this(null, permits, rateTime, timeUnit);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
Expand All @@ -82,13 +82,26 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit) {
this(service, permits, rateTime, timeUnit, (Supplier<Long>) null);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
this(service, permits, rateTime, timeUnit, permitUpdater, false);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter) {
this(service, permits, rateTime, timeUnit, permitUpdater, isDispatchOrPrecisePublishRateLimiter,
null);
}

@Builder
RateLimiter(final ScheduledExecutorService scheduledExecutorService, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater, boolean isDispatchOrPrecisePublishRateLimiter,
RateLimitFunction rateLimitFunction) {
checkArgument(permits > 0, "rate must be > 0");
checkArgument(rateTime > 0, "Renew permit time must be > 0");

Expand All @@ -98,8 +111,8 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f
this.permitUpdater = permitUpdater;
this.isDispatchOrPrecisePublishRateLimiter = isDispatchOrPrecisePublishRateLimiter;

if (service != null) {
this.executorService = service;
if (scheduledExecutorService != null) {
this.executorService = scheduledExecutorService;
this.externalExecutor = true;
} else {
final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
Expand All @@ -109,6 +122,14 @@ public RateLimiter(final ScheduledExecutorService service, final long permits, f
this.externalExecutor = false;
}

this.rateLimitFunction = rateLimitFunction;

}

// default values for Lombok generated builder class
public static class RateLimiterBuilder {
private long rateTime = 1;
private TimeUnit timeUnit = TimeUnit.SECONDS;
}

@Override
Expand Down