Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Broker] Fix set-publish-rate when using preciseTopicPublishRateLimiterEnable=true #10384

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -929,7 +929,7 @@ protected void updatePublishDispatcher(PublishRate publishRate) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
() -> this.enableCnxAutoRead());
() -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
Expand All @@ -29,28 +30,36 @@ public class PrecisPublishLimiter implements PublishRateLimiter {
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
// precise mode for publish rate limiter
private RateLimiter topicPublishRateLimiterOnMessage;
private RateLimiter topicPublishRateLimiterOnByte;
private volatile RateLimiter topicPublishRateLimiterOnMessage;
private volatile RateLimiter topicPublishRateLimiterOnByte;
private final RateLimitFunction rateLimitFunction;
private final ScheduledExecutorService scheduledExecutorService;

public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(policies, clusterName);
this.scheduledExecutorService = null;
}

public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) {
this(publishRate, rateLimitFunction, null);
}

public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction,
ScheduledExecutorService scheduledExecutorService) {
this.rateLimitFunction = rateLimitFunction;
update(publishRate);
this.scheduledExecutorService = scheduledExecutorService;
}

@Override
public void checkPublishRate() {
// No-op
// No-op
}

@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// No-op
// No-op
}

@Override
Expand All @@ -62,10 +71,15 @@ public boolean resetPublishCount() {
public boolean isPublishRateExceeded() {
return false;
}

// If all rate limiters are not exceeded, re-enable auto read from socket.
private void tryReleaseConnectionThrottle() {
if ((topicPublishRateLimiterOnMessage != null && topicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
|| (topicPublishRateLimiterOnByte != null && topicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
if ((currentTopicPublishRateLimiterOnMessage != null
&& currentTopicPublishRateLimiterOnMessage.getAvailablePermits() <= 0)
|| (currentTopicPublishRateLimiterOnByte != null
&& currentTopicPublishRateLimiterOnByte.getAvailablePermits() <= 0)) {
return;
}
this.rateLimitFunction.apply();
Expand All @@ -78,34 +92,68 @@ public void update(Policies policies, String clusterName) {
: null;
this.update(maxPublishRate);
}

public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null
eolivelli marked this conversation as resolved.
Show resolved Hide resolved
&& (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS,
this::tryReleaseConnectionThrottle, true);
replaceLimiters(() -> {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0
|| maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS,
this::tryReleaseConnectionThrottle, true);
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte =
new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS,
this::tryReleaseConnectionThrottle, true);
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte =
new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS,
this::tryReleaseConnectionThrottle, true);
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
topicPublishRateLimiterOnMessage = null;
topicPublishRateLimiterOnByte = null;
}
});
}

@Override
public boolean tryAcquire(int numbers, long bytes) {
return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers))
&& (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes));
RateLimiter currentTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
RateLimiter currentTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
return (currentTopicPublishRateLimiterOnMessage == null
|| currentTopicPublishRateLimiterOnMessage.tryAcquire(numbers))
&& (currentTopicPublishRateLimiterOnByte == null
|| currentTopicPublishRateLimiterOnByte.tryAcquire(bytes));
}

@Override
public void close() throws Exception {
rateLimitFunction.apply();
replaceLimiters(null);
}

private void replaceLimiters(Runnable updater) {
RateLimiter previousTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
topicPublishRateLimiterOnMessage = null;
RateLimiter previousTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
topicPublishRateLimiterOnByte = null;
try {
if (updater != null) {
updater.run();
}
} finally {
// Close previous limiters to prevent resource leakages.
// Delay closing of previous limiters after new ones are in place so that updating the limiter
// doesn't cause unavailability.
if (previousTopicPublishRateLimiterOnMessage != null) {
previousTopicPublishRateLimiterOnMessage.close();
}
if (previousTopicPublishRateLimiterOnByte != null) {
previousTopicPublishRateLimiterOnByte.close();
}
}
}
}
Expand Up @@ -21,7 +21,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;

public interface PublishRateLimiter {
public interface PublishRateLimiter extends AutoCloseable {

PublishRateLimiter DISABLED_RATE_LIMITER = PublishRateLimiterDisable.DISABLED_RATE_LIMITER;

Expand Down
Expand Up @@ -62,4 +62,8 @@ public boolean tryAcquire(int numbers, long bytes) {
return true;
}

@Override
public void close() throws Exception {
// No-op
}
}
Expand Up @@ -108,4 +108,9 @@ public void update(PublishRate maxPublishRate) {
public boolean tryAcquire(int numbers, long bytes) {
return false;
}

@Override
public void close() throws Exception {
// no-op
}
}
Expand Up @@ -445,6 +445,13 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
Expand Down
Expand Up @@ -1149,6 +1149,13 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
Expand Down
Expand Up @@ -140,7 +140,7 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier,
new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
ratePeriod, TimeUnit.SECONDS, null));
ratePeriod, TimeUnit.SECONDS));
} else {
this.subscribeRateLimiter.get(consumerIdentifier)
.setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
Expand Down
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.testng.annotations.Test;

public class PrecisPublishLimiterTest {

@Test
void shouldResetMsgLimitAfterUpdate() {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
});
precisPublishLimiter.update(new PublishRate(1, 1));
assertFalse(precisPublishLimiter.tryAcquire(99, 99));
precisPublishLimiter.update(new PublishRate(-1, 100));
assertTrue(precisPublishLimiter.tryAcquire(99, 99));
}

@Test
void shouldResetBytesLimitAfterUpdate() {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
});
precisPublishLimiter.update(new PublishRate(1, 1));
assertFalse(precisPublishLimiter.tryAcquire(99, 99));
precisPublishLimiter.update(new PublishRate(100, -1));
assertTrue(precisPublishLimiter.tryAcquire(99, 99));
}

@Test
void shouldCloseResources() throws Exception {
for (int i = 0; i < 20000; i++) {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> {
});
precisPublishLimiter.tryAcquire(99, 99);
precisPublishLimiter.close();
}
}
}
Expand Up @@ -63,7 +63,7 @@ public class RateLimiter implements AutoCloseable{
private boolean isDispatchOrPrecisePublishRateLimiter;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit, null);
this(null, permits, rateTime, timeUnit);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit, boolean isDispatchOrPrecisePublishRateLimiter) {
Expand All @@ -82,6 +82,11 @@ public RateLimiter(final long permits, final long rateTime, final TimeUnit timeU
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit) {
this(service, permits, rateTime, timeUnit, (Supplier<Long>) null);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
this(service, permits, rateTime, timeUnit, permitUpdater, false);
Expand Down
Expand Up @@ -68,21 +68,21 @@ public class FunctionStatsManager extends ComponentStatsManager{
final Counter statTotalSysExceptions;

final Counter statTotalUserExceptions;

final Summary statProcessLatency;

final Gauge statlastInvocation;

final Counter statTotalRecordsReceived;

// windowed metrics

final Counter statTotalProcessedSuccessfully1min;

final Counter statTotalSysExceptions1min;

final Counter statTotalUserExceptions1min;

final Summary statProcessLatency1min;

final Counter statTotalRecordsReceived1min;
Expand Down Expand Up @@ -262,8 +262,8 @@ public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry,
.help("Exception from sink.")
.create());

userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
}

public void addUserException(Throwable ex) {
Expand Down Expand Up @@ -371,7 +371,7 @@ public double getTotalSysExceptions() {
public double getTotalUserExceptions() {
return _statTotalUserExceptions.get();
}

@Override
public double getLastInvocation() {
return _statlastInvocation.get();
Expand Down Expand Up @@ -417,7 +417,7 @@ public double getTotalSysExceptions1min() {
public double getTotalUserExceptions1min() {
return _statTotalUserExceptions1min.get();
}

@Override
public double getAvgProcessLatency1min() {
return _statProcessLatency1min.get().count <= 0.0
Expand Down
Expand Up @@ -196,8 +196,8 @@ public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] me
.help("Exception from sink.")
.create());

sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
}

@Override
Expand Down
Expand Up @@ -195,8 +195,8 @@ public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[]
.help("Exception from source.")
.create());

sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
}

@Override
Expand Down