Skip to content

Commit

Permalink
[Broker] Fix broker dispatch byte rate limiter. (#11135)
Browse files Browse the repository at this point in the history
## Motivation
fix #11044
now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that  dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time.
  • Loading branch information
congbobo184 committed Jul 6, 2021
1 parent a7e40ea commit ce6be12
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 94 deletions.
Expand Up @@ -29,6 +29,7 @@
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand All @@ -47,8 +48,11 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {

protected final Subscription subscription;

protected AbstractBaseDispatcher(Subscription subscription) {
protected final ServiceConfiguration serviceConfig;

protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) {
this.subscription = subscription;
this.serviceConfig = serviceConfig;
}

/**
Expand Down Expand Up @@ -234,6 +238,19 @@ public void resetCloseFuture() {
// noop
}

protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg,
long bytesToRead, long availablePermitsOnByte) {
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg);
}

if (availablePermitsOnByte > 0) {
bytesToRead = Math.min(bytesToRead, availablePermitsOnByte);
}

return Pair.of(messagesToRead, bytesToRead);
}

protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName());
}
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.slf4j.Logger;
Expand All @@ -46,8 +47,8 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi

private Random random = new Random(42);

protected AbstractDispatcherMultipleConsumers(Subscription subscription) {
super(subscription);
protected AbstractDispatcherMultipleConsumers(Subscription subscription, ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
}

public boolean isConsumerConnected() {
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
Expand Down Expand Up @@ -55,8 +56,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas
private volatile int isClosed = FALSE;

public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
String topicName, Subscription subscription) {
super(subscription);
String topicName, Subscription subscription,
ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
this.topicName = topicName;
this.consumers = new CopyOnWriteArrayList<>();
this.partitionIndex = partitionIndex;
Expand Down
Expand Up @@ -23,7 +23,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -56,16 +55,14 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
@SuppressWarnings("unused")
private volatile int totalAvailablePermits = 0;

private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;

public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
super(subscription);
super(subscription, topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.name = topic.getName() + " / " + subscription.getName();
this.msgDrop = new Rate();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}

Expand Down
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.EntryBatchSizes;
Expand All @@ -40,16 +39,15 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD
private final NonPersistentTopic topic;
private final Rate msgDrop;
private final Subscription subscription;
private final ServiceConfiguration serviceConfig;
private final RedeliveryTracker redeliveryTracker;

public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
NonPersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription);
super(subscriptionType, partitionIndex, topic.getName(), subscription,
topic.getBrokerService().pulsar().getConfiguration());
this.topic = topic;
this.subscription = subscription;
this.msgDrop = new Rate();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
}

Expand Down
Expand Up @@ -75,6 +75,15 @@ public long getAvailableDispatchRateLimitOnMsg() {
return dispatchRateLimiterOnMessage == null ? -1 : dispatchRateLimiterOnMessage.getAvailablePermits();
}

/**
* returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1.
*
* @return
*/
public long getAvailableDispatchRateLimitOnByte() {
return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits();
}

/**
* It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed.
*
Expand Down
Expand Up @@ -39,7 +39,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -106,7 +106,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class,
"blockedDispatcherOnUnackedMsgs");
protected final ServiceConfiguration serviceConfig;
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();

protected enum ReadType {
Expand All @@ -115,8 +114,7 @@ protected enum ReadType {

public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription) {
super(subscription);
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
super(subscription, topic.getBrokerService().pulsar().getConfiguration());
this.cursor = cursor;
this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange();
this.name = topic.getName() + " / " + Codec.decode(cursor.getName());
Expand Down Expand Up @@ -224,9 +222,11 @@ public synchronized void readMoreEntries() {
int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits);
if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) {
int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits);
Pair<Integer, Long> calculateResult = calculateToRead(currentTotalAvailablePermits);
int messagesToRead = calculateResult.getLeft();
long bytesToRead = calculateResult.getRight();

if (-1 == messagesToRead) {
if (messagesToRead == -1 || bytesToRead == -1) {
// Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete.
return;
}
Expand Down Expand Up @@ -263,8 +263,7 @@ public synchronized void readMoreEntries() {
consumerList.size());
}
havePendingRead = true;
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(),
this,
cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this,
ReadType.Normal, topic.getMaxReadPosition());
} else {
log.debug("[{}] Cannot schedule next read until previous one is done", name);
Expand All @@ -276,8 +275,10 @@ public synchronized void readMoreEntries() {
}
}

protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
// left pair is messagesToRead, right pair is bytesToRead
protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits) {
int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes();

Consumer c = getRandomConsumer();
// if turn on precise dispatcher flow control, adjust the record to read
Expand Down Expand Up @@ -310,13 +311,15 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
}
topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
return -1;
return Pair.of(-1, -1L);
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
}
Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
(int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());

messagesToRead = calculateResult.getLeft();
bytesToRead = calculateResult.getRight();

}
}

Expand All @@ -331,13 +334,14 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
}
topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
return -1;
return Pair.of(-1, -1L);
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
}
Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
(int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());

messagesToRead = calculateResult.getLeft();
bytesToRead = calculateResult.getRight();
}
}

Expand All @@ -347,11 +351,11 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping replay while awaiting previous read to complete", name);
}
return -1;
return Pair.of(-1, -1L);
}

// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
return Math.max(messagesToRead, 1);
return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
}

protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
Expand Down

0 comments on commit ce6be12

Please sign in to comment.