From ce6be124c9c86a6e10604ff44dc817f5d6f13c0e Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Tue, 6 Jul 2021 10:28:43 +0800 Subject: [PATCH] [Broker] Fix broker dispatch byte rate limiter. (#11135) ## Motivation fix https://github.com/apache/pulsar/issues/11044 now dispatcher byte rate limit don't limit every cursor read. When cursor read always use `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read. It will cause that dispatcher read entries by `ServiceConfiguration.dispatcherMaxReadSizeBytes` to read every time. --- .../service/AbstractBaseDispatcher.java | 19 ++++++- .../AbstractDispatcherMultipleConsumers.java | 5 +- ...bstractDispatcherSingleActiveConsumer.java | 6 ++- ...PersistentDispatcherMultipleConsumers.java | 5 +- ...sistentDispatcherSingleActiveConsumer.java | 6 +-- .../persistent/DispatchRateLimiter.java | 9 ++++ ...PersistentDispatcherMultipleConsumers.java | 50 ++++++++++--------- ...sistentDispatcherSingleActiveConsumer.java | 49 ++++++++++-------- ...tStreamingDispatcherMultipleConsumers.java | 10 ++-- ...reamingDispatcherSingleActiveConsumer.java | 11 ++-- .../StreamingEntryReader.java | 4 +- .../api/MessageDispatchThrottlingTest.java | 42 ++++++---------- 12 files changed, 122 insertions(+), 94 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index bb10df4d53876..3646ae67f23d1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -47,8 +48,11 @@ public abstract class AbstractBaseDispatcher implements Dispatcher { protected final Subscription subscription; - protected AbstractBaseDispatcher(Subscription subscription) { + protected final ServiceConfiguration serviceConfig; + + protected AbstractBaseDispatcher(Subscription subscription, ServiceConfiguration serviceConfig) { this.subscription = subscription; + this.serviceConfig = serviceConfig; } /** @@ -234,6 +238,19 @@ public void resetCloseFuture() { // noop } + protected static Pair computeReadLimits(int messagesToRead, int availablePermitsOnMsg, + long bytesToRead, long availablePermitsOnByte) { + if (availablePermitsOnMsg > 0) { + messagesToRead = Math.min(messagesToRead, availablePermitsOnMsg); + } + + if (availablePermitsOnByte > 0) { + bytesToRead = Math.min(bytesToRead, availablePermitsOnByte); + } + + return Pair.of(messagesToRead, bytesToRead); + } + protected byte[] peekStickyKey(ByteBuf metadataAndPayload) { return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName()); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java index 2f6b9a6524369..ad9805964b303 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java @@ -23,6 +23,7 @@ import java.util.Random; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.PersistentStickyKeyDispatcherMultipleConsumers; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.slf4j.Logger; @@ -46,8 +47,8 @@ public abstract class AbstractDispatcherMultipleConsumers extends AbstractBaseDi private Random random = new Random(42); - protected AbstractDispatcherMultipleConsumers(Subscription subscription) { - super(subscription); + protected AbstractDispatcherMultipleConsumers(Subscription subscription, ServiceConfiguration serviceConfig) { + super(subscription, serviceConfig); } public boolean isConsumerConnected() { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java index b142c51adeb43..e73daaa4210d4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; @@ -55,8 +56,9 @@ public abstract class AbstractDispatcherSingleActiveConsumer extends AbstractBas private volatile int isClosed = FALSE; public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, - String topicName, Subscription subscription) { - super(subscription); + String topicName, Subscription subscription, + ServiceConfiguration serviceConfig) { + super(subscription, serviceConfig); this.topicName = topicName; this.consumers = new CopyOnWriteArrayList<>(); this.partitionIndex = partitionIndex; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java index 1a19186ecefa2..5688683359280 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java @@ -23,7 +23,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.Entry; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; @@ -56,16 +55,14 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher @SuppressWarnings("unused") private volatile int totalAvailablePermits = 0; - private final ServiceConfiguration serviceConfig; private final RedeliveryTracker redeliveryTracker; public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) { - super(subscription); + super(subscription, topic.getBrokerService().pulsar().getConfiguration()); this.topic = topic; this.subscription = subscription; this.name = topic.getName() + " / " + subscription.getName(); this.msgDrop = new Rate(); - this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java index a720767fee808..6094ab71df2cc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java @@ -21,7 +21,6 @@ import java.util.List; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.Entry; -import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.EntryBatchSizes; @@ -40,16 +39,15 @@ public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractD private final NonPersistentTopic topic; private final Rate msgDrop; private final Subscription subscription; - private final ServiceConfiguration serviceConfig; private final RedeliveryTracker redeliveryTracker; public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex, NonPersistentTopic topic, Subscription subscription) { - super(subscriptionType, partitionIndex, topic.getName(), subscription); + super(subscriptionType, partitionIndex, topic.getName(), subscription, + topic.getBrokerService().pulsar().getConfiguration()); this.topic = topic; this.subscription = subscription; this.msgDrop = new Rate(); - this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java index 8cf4427ce4d22..994d27418e76e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java @@ -75,6 +75,15 @@ public long getAvailableDispatchRateLimitOnMsg() { return dispatchRateLimiterOnMessage == null ? -1 : dispatchRateLimiterOnMessage.getAvailablePermits(); } + /** + * returns available byte-permit if msg-dispatch-throttling is enabled else it returns -1. + * + * @return + */ + public long getAvailableDispatchRateLimitOnByte() { + return dispatchRateLimiterOnByte == null ? -1 : dispatchRateLimiterOnByte.getAvailablePermits(); + } + /** * It acquires msg and bytes permits from rate-limiter and returns if acquired permits succeed. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 90ad1e1e22fe2..179d866b112f9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -39,7 +39,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker; import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -106,7 +106,6 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER = AtomicIntegerFieldUpdater.newUpdater(PersistentDispatcherMultipleConsumers.class, "blockedDispatcherOnUnackedMsgs"); - protected final ServiceConfiguration serviceConfig; protected Optional dispatchRateLimiter = Optional.empty(); protected enum ReadType { @@ -115,8 +114,7 @@ protected enum ReadType { public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) { - super(subscription); - this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); + super(subscription, topic.getBrokerService().pulsar().getConfiguration()); this.cursor = cursor; this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange(); this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); @@ -224,9 +222,11 @@ public synchronized void readMoreEntries() { int firstAvailableConsumerPermits = getFirstAvailableConsumerPermits(); int currentTotalAvailablePermits = Math.max(totalAvailablePermits, firstAvailableConsumerPermits); if (currentTotalAvailablePermits > 0 && firstAvailableConsumerPermits > 0) { - int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits); + Pair calculateResult = calculateToRead(currentTotalAvailablePermits); + int messagesToRead = calculateResult.getLeft(); + long bytesToRead = calculateResult.getRight(); - if (-1 == messagesToRead) { + if (messagesToRead == -1 || bytesToRead == -1) { // Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete. return; } @@ -263,8 +263,7 @@ public synchronized void readMoreEntries() { consumerList.size()); } havePendingRead = true; - cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), - this, + cursor.asyncReadEntriesOrWait(messagesToRead, bytesToRead, this, ReadType.Normal, topic.getMaxReadPosition()); } else { log.debug("[{}] Cannot schedule next read until previous one is done", name); @@ -276,8 +275,10 @@ public synchronized void readMoreEntries() { } } - protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) { + // left pair is messagesToRead, right pair is bytesToRead + protected Pair calculateToRead(int currentTotalAvailablePermits) { int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize); + long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes(); Consumer c = getRandomConsumer(); // if turn on precise dispatcher flow control, adjust the record to read @@ -310,13 +311,15 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) { } topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); - return -1; + return Pair.of(-1, -1L); } else { - // if dispatch-rate is in msg then read only msg according to available permit - long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg(); - if (availablePermitsOnMsg > 0) { - messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg); - } + Pair calculateResult = computeReadLimits(messagesToRead, + (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(), + bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte()); + + messagesToRead = calculateResult.getLeft(); + bytesToRead = calculateResult.getRight(); + } } @@ -331,13 +334,14 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) { } topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); - return -1; + return Pair.of(-1, -1L); } else { - // if dispatch-rate is in msg then read only msg according to available permit - long availablePermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(); - if (availablePermitsOnMsg > 0) { - messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg); - } + Pair calculateResult = computeReadLimits(messagesToRead, + (int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(), + bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte()); + + messagesToRead = calculateResult.getLeft(); + bytesToRead = calculateResult.getRight(); } } @@ -347,11 +351,11 @@ protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) { if (log.isDebugEnabled()) { log.debug("[{}] Skipping replay while awaiting previous read to complete", name); } - return -1; + return Pair.of(-1, -1L); } // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException - return Math.max(messagesToRead, 1); + return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1)); } protected Set asyncReplayEntries(Set positions) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 93c2d2547ab16..2bd94fff51c50 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -34,7 +34,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; -import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Dispatcher; @@ -67,19 +67,18 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher protected volatile int readBatchSize; protected final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); - protected final ServiceConfiguration serviceConfig; private volatile ScheduledFuture readOnActiveConsumerTask = null; private final RedeliveryTracker redeliveryTracker; public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex, PersistentTopic topic, Subscription subscription) { - super(subscriptionType, partitionIndex, topic.getName(), subscription); + super(subscriptionType, partitionIndex, topic.getName(), subscription, + topic.getBrokerService().pulsar().getConfiguration()); this.topic = topic; this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName()) : ""/* NonDurableCursor doesn't have name */); this.cursor = cursor; - this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); @@ -324,9 +323,11 @@ protected void readMoreEntries(Consumer consumer) { } if (consumer.getAvailablePermits() > 0) { - int messagesToRead = calculateNumOfMessageToRead(consumer); + Pair calculateResult = calculateToRead(consumer); + int messagesToRead = calculateResult.getLeft(); + long bytesToRead = calculateResult.getRight(); - if (-1 == messagesToRead) { + if (-1 == messagesToRead || bytesToRead == -1) { // Skip read as topic/dispatcher has exceed the dispatch rate. return; } @@ -340,7 +341,7 @@ protected void readMoreEntries(Consumer consumer) { topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); } else { cursor.asyncReadEntriesOrWait(messagesToRead, - serviceConfig.getDispatcherMaxReadSizeBytes(), this, consumer, topic.getMaxReadPosition()); + bytesToRead, this, consumer, topic.getMaxReadPosition()); } } else { if (log.isDebugEnabled()) { @@ -349,7 +350,7 @@ protected void readMoreEntries(Consumer consumer) { } } - protected int calculateNumOfMessageToRead(Consumer consumer) { + protected Pair calculateToRead(Consumer consumer) { int availablePermits = consumer.getAvailablePermits(); if (!consumer.isWritable()) { // If the connection is not currently writable, we issue the read request anyway, but for a single @@ -360,6 +361,7 @@ protected int calculateNumOfMessageToRead(Consumer consumer) { } int messagesToRead = Math.min(availablePermits, readBatchSize); + long bytesToRead = serviceConfig.getDispatcherMaxReadSizeBytes(); // if turn of precise dispatcher flow control, adjust the records to read if (consumer.isPreciseDispatcherFlowControl()) { int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry(); @@ -391,13 +393,16 @@ protected int calculateNumOfMessageToRead(Consumer consumer) { } } }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); - return -1; + return Pair.of(-1, -1L); } else { - // if dispatch-rate is in msg then read only msg according to available permit - long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg(); - if (availablePermitsOnMsg > 0) { - messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg); - } + + Pair calculateResult = computeReadLimits(messagesToRead, + (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(), + bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte()); + + messagesToRead = calculateResult.getLeft(); + bytesToRead = calculateResult.getRight(); + } } @@ -421,19 +426,21 @@ protected int calculateNumOfMessageToRead(Consumer consumer) { } } }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS); - return -1; + return Pair.of(-1, -1L); } else { - // if dispatch-rate is in msg then read only msg according to available permit - long subPermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(); - if (subPermitsOnMsg > 0) { - messagesToRead = Math.min(messagesToRead, (int) subPermitsOnMsg); - } + + Pair calculateResult = computeReadLimits(messagesToRead, + (int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(), + bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte()); + + messagesToRead = calculateResult.getLeft(); + bytesToRead = calculateResult.getRight(); } } } // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException - return Math.max(messagesToRead, 1); + return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1)); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index 666bb9813cae8..f7d47e6c2c982 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -29,6 +29,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.streamingdispatch.PendingReadEntryRequest; @@ -139,9 +140,10 @@ public synchronized void readMoreEntries() { // totalAvailablePermits may be updated by other threads int currentTotalAvailablePermits = totalAvailablePermits; if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) { - int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits); - - if (-1 == messagesToRead) { + Pair calculateResult = calculateToRead(currentTotalAvailablePermits); + int messagesToRead = calculateResult.getLeft(); + long bytesToRead = calculateResult.getRight(); + if (-1 == messagesToRead || bytesToRead == -1) { // Skip read as topic/dispatcher has exceed the dispatch rate or previous pending read hasn't complete. return; } @@ -178,7 +180,7 @@ public synchronized void readMoreEntries() { consumerList.size()); } havePendingRead = true; - streamingEntryReader.asyncReadEntries(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), + streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, ReadType.Normal); } else { log.debug("[{}] Cannot schedule next read until previous one is done", name); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java index e90705cc3d865..e213b04915413 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java @@ -28,6 +28,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.util.SafeRun; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.EntryBatchIndexesAcks; import org.apache.pulsar.broker.service.EntryBatchSizes; @@ -179,9 +180,12 @@ protected void readMoreEntries(Consumer consumer) { } if (!havePendingRead && consumer.getAvailablePermits() > 0) { - int messagesToRead = calculateNumOfMessageToRead(consumer); + Pair calculateResult = calculateToRead(consumer); + int messagesToRead = calculateResult.getLeft(); + long bytesToRead = calculateResult.getRight(); - if (-1 == messagesToRead) { + + if (-1 == messagesToRead || bytesToRead == -1) { // Skip read as topic/dispatcher has exceed the dispatch rate. return; } @@ -195,8 +199,7 @@ protected void readMoreEntries(Consumer consumer) { if (consumer.readCompacted()) { topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); } else { - streamingEntryReader.asyncReadEntries(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), - consumer); + streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer); } } else { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java index 51356c99a6b26..12f5600c51a14 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java @@ -68,7 +68,7 @@ public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, W private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(StreamingEntryReader.class, State.class, "state"); - private volatile int maxReadSizeByte; + private volatile long maxReadSizeByte; private final Backoff readFailureBackoff = new Backoff(10, TimeUnit.MILLISECONDS, 1, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS); @@ -81,7 +81,7 @@ public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, W * @param maxReadSizeByte maximum byte will be read from ledger. * @param ctx Context send along with read request. */ - public synchronized void asyncReadEntries(int numEntriesToRead, int maxReadSizeByte, Object ctx) { + public synchronized void asyncReadEntries(int numEntriesToRead, long maxReadSizeByte, Object ctx) { if (STATE_UPDATER.compareAndSet(this, State.Canceling, State.Canceled)) { internalCancelReadRequests(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index fc3cfaf3f9eee..f605028a1a70d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -43,6 +43,7 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -413,12 +414,15 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio */ @Test(dataProvider = "subscriptions", timeOut = 5000) public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception { + conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true); log.info("-- Starting {} test --", methodName); final String namespace = "my-property/throttling_ns"; final String topicName = "persistent://" + namespace + "/throttlingAll"; + final String subscriptionName = "my-subscriber-name"; - final int byteRate = 100; + // + final int byteRate = 250; DispatchRate dispatchRate = DispatchRate.builder() .dispatchThrottlingRateInMsg(-1) .dispatchThrottlingRateInByte(byteRate) @@ -426,47 +430,31 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT .build(); admin.namespaces().createNamespace(namespace, Sets.newHashSet("test")); admin.namespaces().setDispatchRate(namespace, dispatchRate); + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); // create producer and topic - Producer producer = pulsarClient.newProducer().topic(topicName).create(); + Producer producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create(); PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); - boolean isMessageRateUpdate = false; - int retry = 5; - for (int i = 0; i < retry; i++) { - if (topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0) { - isMessageRateUpdate = true; - break; - } else { - if (i != retry - 1) { - Thread.sleep(100); - } - } - } - Assert.assertTrue(isMessageRateUpdate); + Awaitility.await().until(() -> topic.getDispatchRateLimiter().get().getDispatchRateOnByte() > 0); Assert.assertEquals(admin.namespaces().getDispatchRate(namespace), dispatchRate); final int numProducedMessages = 20; - final CountDownLatch latch = new CountDownLatch(numProducedMessages); final AtomicInteger totalReceived = new AtomicInteger(0); - Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") + for (int i = 0; i < numProducedMessages; i++) { + producer.send(new byte[99]); + } + + Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName) .subscriptionType(subscription).messageListener((c1, msg) -> { Assert.assertNotNull(msg, "Message cannot be null"); String receivedMessage = new String(msg.getData()); log.debug("Received message [{}] in the listener", receivedMessage); totalReceived.incrementAndGet(); - latch.countDown(); }).subscribe(); - // deactive cursors - deactiveCursors((ManagedLedgerImpl) topic.getManagedLedger()); - // Asynchronously produce messages - for (int i = 0; i < numProducedMessages; i++) { - producer.send(new byte[byteRate / 10]); - } - - latch.await(); - Assert.assertEquals(totalReceived.get(), numProducedMessages); + Awaitility.await().atLeast(3, TimeUnit.SECONDS) + .atMost(5, TimeUnit.SECONDS).until(() -> totalReceived.get() > 6 && totalReceived.get() < 10); consumer.close(); producer.close();