Skip to content

Commit

Permalink
[Broker] Fix issue in set-publish-rate when using preciseTopicPublish…
Browse files Browse the repository at this point in the history
…RateLimiterEnable=true

- disabling the limit didn't work after setting a limit because of a bug in PrecisPublishLimiter
  • Loading branch information
lhotari committed Apr 26, 2021
1 parent 69a173a commit a9826bd
Show file tree
Hide file tree
Showing 10 changed files with 160 additions and 38 deletions.
Expand Up @@ -910,7 +910,7 @@ protected void updatePublishDispatcher(PublishRate publishRate) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
() -> this.enableCnxAutoRead());
() -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
}
Expand Down
Expand Up @@ -18,39 +18,48 @@
*/
package org.apache.pulsar.broker.service;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.util.RateLimitFunction;
import org.apache.pulsar.common.util.RateLimiter;

public class PrecisPublishLimiter implements PublishRateLimiter {
public class PrecisPublishLimiter implements PublishRateLimiter, AutoCloseable {
protected volatile int publishMaxMessageRate = 0;
protected volatile long publishMaxByteRate = 0;
protected volatile boolean publishThrottlingEnabled = false;
// precise mode for publish rate limiter
private RateLimiter topicPublishRateLimiterOnMessage;
private RateLimiter topicPublishRateLimiterOnByte;
private volatile RateLimiter topicPublishRateLimiterOnMessage;
private volatile RateLimiter topicPublishRateLimiterOnByte;
private final RateLimitFunction rateLimitFunction;
private final ScheduledExecutorService scheduledExecutorService;

public PrecisPublishLimiter(Policies policies, String clusterName, RateLimitFunction rateLimitFunction) {
this.rateLimitFunction = rateLimitFunction;
update(policies, clusterName);
this.scheduledExecutorService = null;
}

public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction) {
this(publishRate, rateLimitFunction, null);
}

public PrecisPublishLimiter(PublishRate publishRate, RateLimitFunction rateLimitFunction,
ScheduledExecutorService scheduledExecutorService) {
this.rateLimitFunction = rateLimitFunction;
update(publishRate);
this.scheduledExecutorService = scheduledExecutorService;
}

@Override
public void checkPublishRate() {
// No-op
// No-op
}

@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// No-op
// No-op
}

@Override
Expand All @@ -71,31 +80,63 @@ public void update(Policies policies, String clusterName) {
: null;
this.update(maxPublishRate);
}

public void update(PublishRate maxPublishRate) {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0 || maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
new RateLimiter(publishMaxMessageRate, 1, TimeUnit.SECONDS, rateLimitFunction);
}
if (this.publishMaxByteRate > 0) {
topicPublishRateLimiterOnByte = new RateLimiter(publishMaxByteRate, 1, TimeUnit.SECONDS);
replaceLimiters(() -> {
if (maxPublishRate != null
&& (maxPublishRate.publishThrottlingRateInMsg > 0
|| maxPublishRate.publishThrottlingRateInByte > 0)) {
this.publishThrottlingEnabled = true;
this.publishMaxMessageRate = Math.max(maxPublishRate.publishThrottlingRateInMsg, 0);
this.publishMaxByteRate = Math.max(maxPublishRate.publishThrottlingRateInByte, 0);
if (this.publishMaxMessageRate > 0) {
topicPublishRateLimiterOnMessage =
new RateLimiter(scheduledExecutorService, publishMaxMessageRate, 1,
TimeUnit.SECONDS, rateLimitFunction);
}
if (this.publishMaxByteRate > 0) {
// TODO: is it intentional that rateLimitFunction isn't passed to topicPublishRateLimiterOnByte?
topicPublishRateLimiterOnByte = new RateLimiter(scheduledExecutorService, publishMaxByteRate,
1, TimeUnit.SECONDS);
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
}
} else {
this.publishMaxMessageRate = 0;
this.publishMaxByteRate = 0;
this.publishThrottlingEnabled = false;
topicPublishRateLimiterOnMessage = null;
topicPublishRateLimiterOnByte = null;
}
});
}

@Override
public boolean tryAcquire(int numbers, long bytes) {
return (topicPublishRateLimiterOnMessage == null || topicPublishRateLimiterOnMessage.tryAcquire(numbers))
&& (topicPublishRateLimiterOnByte == null || topicPublishRateLimiterOnByte.tryAcquire(bytes));
}

@Override
public void close() throws Exception {
replaceLimiters(null);
}

private void replaceLimiters(Runnable updater) {
RateLimiter previousTopicPublishRateLimiterOnMessage = topicPublishRateLimiterOnMessage;
topicPublishRateLimiterOnMessage = null;
RateLimiter previousTopicPublishRateLimiterOnByte = topicPublishRateLimiterOnByte;
topicPublishRateLimiterOnByte = null;
try {
if (updater != null) {
updater.run();
}
} finally {
// Close previous limiters to prevent resource leakages.
// Delay closing of previous limiters after new ones are in place so that updating the limiter
// doesn't cause unavailability.
if (previousTopicPublishRateLimiterOnMessage != null) {
previousTopicPublishRateLimiterOnMessage.close();
}
if (previousTopicPublishRateLimiterOnByte != null) {
previousTopicPublishRateLimiterOnByte.close();
}
}
}
}
Expand Up @@ -425,6 +425,13 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter instanceof AutoCloseable) {
try {
((AutoCloseable) topicPublishRateLimiter).close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));

CompletableFuture<Void> clientCloseFuture =
Expand Down
Expand Up @@ -1129,6 +1129,13 @@ public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter instanceof AutoCloseable) {
try {
((AutoCloseable) topicPublishRateLimiter).close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));

CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect
Expand Down
Expand Up @@ -140,7 +140,7 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
if (this.subscribeRateLimiter.get(consumerIdentifier) == null) {
this.subscribeRateLimiter.put(consumerIdentifier,
new RateLimiter(brokerService.pulsar().getExecutor(), ratePerConsumer,
ratePeriod, TimeUnit.SECONDS, null));
ratePeriod, TimeUnit.SECONDS));
} else {
this.subscribeRateLimiter.get(consumerIdentifier)
.setRate(ratePerConsumer, ratePeriod, TimeUnit.SECONDS,
Expand Down
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.testng.annotations.Test;

public class PrecisPublishLimiterTest {

@Test
void shouldResetMsgLimitAfterUpdate() {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
});
precisPublishLimiter.update(new PublishRate(1, 1));
assertFalse(precisPublishLimiter.tryAcquire(100, 100));
precisPublishLimiter.update(new PublishRate(-1, 100));
assertTrue(precisPublishLimiter.tryAcquire(100, 100));
}

@Test
void shouldResetBytesLimitAfterUpdate() {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(), () -> {
});
precisPublishLimiter.update(new PublishRate(1, 1));
assertFalse(precisPublishLimiter.tryAcquire(100, 100));
precisPublishLimiter.update(new PublishRate(100, -1));
assertTrue(precisPublishLimiter.tryAcquire(100, 100));
}

@Test
void shouldCloseResources() throws Exception {
for (int i = 0; i < 20000; i++) {
PrecisPublishLimiter precisPublishLimiter = new PrecisPublishLimiter(new PublishRate(100, 100), () -> {
});
precisPublishLimiter.tryAcquire(100, 100);
precisPublishLimiter.close();
}
}
}
Expand Up @@ -62,15 +62,25 @@ public class RateLimiter implements AutoCloseable{
private RateLimitFunction rateLimitFunction;

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit) {
this(null, permits, rateTime, timeUnit, null);
this(null, permits, rateTime, timeUnit);
}

public RateLimiter(final long permits, final long rateTime, final TimeUnit timeUnit,
RateLimitFunction autoReadResetFunction) {
this(null, permits, rateTime, timeUnit, null);
this(null, permits, rateTime, timeUnit, autoReadResetFunction);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, RateLimitFunction autoReadResetFunction) {
this(service, permits, rateTime, timeUnit);
this.rateLimitFunction = autoReadResetFunction;
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit) {
this(service, permits, rateTime, timeUnit, (Supplier<Long>) null);
}

public RateLimiter(final ScheduledExecutorService service, final long permits, final long rateTime,
final TimeUnit timeUnit, Supplier<Long> permitUpdater) {
checkArgument(permits > 0, "rate must be > 0");
Expand Down
Expand Up @@ -68,21 +68,21 @@ public class FunctionStatsManager extends ComponentStatsManager{
final Counter statTotalSysExceptions;

final Counter statTotalUserExceptions;

final Summary statProcessLatency;

final Gauge statlastInvocation;

final Counter statTotalRecordsReceived;

// windowed metrics

final Counter statTotalProcessedSuccessfully1min;

final Counter statTotalSysExceptions1min;

final Counter statTotalUserExceptions1min;

final Summary statProcessLatency1min;

final Counter statTotalRecordsReceived1min;
Expand Down Expand Up @@ -262,8 +262,8 @@ public FunctionStatsManager(FunctionCollectorRegistry collectorRegistry,
.help("Exception from sink.")
.create());

userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
userExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
}

public void addUserException(Throwable ex) {
Expand Down Expand Up @@ -371,7 +371,7 @@ public double getTotalSysExceptions() {
public double getTotalUserExceptions() {
return _statTotalUserExceptions.get();
}

@Override
public double getLastInvocation() {
return _statlastInvocation.get();
Expand Down Expand Up @@ -417,7 +417,7 @@ public double getTotalSysExceptions1min() {
public double getTotalUserExceptions1min() {
return _statTotalUserExceptions1min.get();
}

@Override
public double getAvgProcessLatency1min() {
return _statProcessLatency1min.get().count <= 0.0
Expand Down
Expand Up @@ -196,8 +196,8 @@ public SinkStatsManager(FunctionCollectorRegistry collectorRegistry, String[] me
.help("Exception from sink.")
.create());

sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sinkExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
}

@Override
Expand Down
Expand Up @@ -195,8 +195,8 @@ public SourceStatsManager(FunctionCollectorRegistry collectorRegistry, String[]
.help("Exception from source.")
.create());

sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES, null);
sysExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
sourceExceptionRateLimiter = new RateLimiter(scheduledExecutorService, 5, 1, TimeUnit.MINUTES);
}

@Override
Expand Down

0 comments on commit a9826bd

Please sign in to comment.