diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java index 5b091baa49654e..f0f64314918cad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java @@ -104,9 +104,7 @@ public void removeConsumer(Consumer consumer) { } @Override - public Consumer select(byte[] stickyKey) { - int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey); - + public Consumer select(int hash) { rwLock.readLock().lock(); try { if (hashRing.isEmpty()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 1e8b2e561042c0..9f4214481588cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -52,6 +52,7 @@ import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -241,7 +242,8 @@ public Future sendMessages(final List entries, EntryBatchSizes batc Entry entry = entries.get(i); if (entry != null) { int batchSize = batchSizes.getBatchSize(i); - pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0); + int stickyKeyHash = getStickyKeyHash(entry); + pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, stickyKeyHash); if (log.isDebugEnabled()){ log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in" + " broker.service.Consumer for consumerId: {}", @@ -748,7 +750,7 @@ public void redeliverUnacknowledgedMessages() { if (pendingAcks != null) { List pendingPositions = new ArrayList<>((int) pendingAcks.size()); MutableInt totalRedeliveryMessages = new MutableInt(0); - pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> { + pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { totalRedeliveryMessages.add((int) batchSize); pendingPositions.add(new PositionImpl(ledgerId, entryId)); }); @@ -771,10 +773,11 @@ public void redeliverUnacknowledgedMessages(List messageIds) { List pendingPositions = Lists.newArrayList(); for (MessageIdData msg : messageIds) { PositionImpl position = PositionImpl.get(msg.getLedgerId(), msg.getEntryId()); - LongPair batchSize = pendingAcks.get(position.getLedgerId(), position.getEntryId()); - if (batchSize != null) { + LongPair longPair = pendingAcks.get(position.getLedgerId(), position.getEntryId()); + if (longPair != null) { + long batchSize = longPair.first; pendingAcks.remove(position.getLedgerId(), position.getEntryId()); - totalRedeliveryMessages += batchSize.first; + totalRedeliveryMessages += batchSize; pendingPositions.add(position); } } @@ -845,5 +848,10 @@ public MessageId getStartMessageId() { return startMessageId; } + private int getStickyKeyHash(Entry entry) { + byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscription.getName()); + return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); + } + private static final Logger log = LoggerFactory.getLogger(Consumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java index e1cc2b8a232052..4f721c955da4b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java @@ -26,7 +26,6 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; -import org.apache.pulsar.common.util.Murmur3_32Hash; /** * This is a consumer selector based fixed hash range. @@ -103,8 +102,7 @@ public synchronized void removeConsumer(Consumer consumer) { } @Override - public Consumer select(byte[] stickyKey) { - int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey); + public Consumer select(int hash) { if (rangeMap.size() > 0) { int slot = hash % rangeSize; return rangeMap.ceilingEntry(slot).getValue(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java index e6be9bf0ebe0ca..f4909ae4fe2e52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.api.proto.KeySharedMeta; -import org.apache.pulsar.common.util.Murmur3_32Hash; /** * This is a sticky-key consumer selector based user provided range. @@ -65,11 +64,6 @@ public void removeConsumer(Consumer consumer) { rangeMap.entrySet().removeIf(entry -> entry.getValue().equals(consumer)); } - @Override - public Consumer select(byte[] stickyKey) { - return select(Murmur3_32Hash.getInstance().makeHash(stickyKey)); - } - @Override public Map> getConsumerKeyHashRanges() { Map> result = new HashMap<>(); @@ -88,7 +82,8 @@ public Map> getConsumerKeyHashRanges() { return result; } - Consumer select(int hash) { + @Override + public Consumer select(int hash) { if (rangeMap.size() > 0) { int slot = hash % rangeSize; Map.Entry ceilingEntry = rangeMap.ceilingEntry(slot); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java index 71972614fd7f2c..83952fc34725f8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; +import org.apache.pulsar.common.util.Murmur3_32Hash; public interface StickyKeyConsumerSelector { @@ -45,7 +46,21 @@ public interface StickyKeyConsumerSelector { * @param stickyKey sticky key * @return consumer */ - Consumer select(byte[] stickyKey); + default Consumer select(byte[] stickyKey) { + return select(makeStickyKeyHash(stickyKey)); + } + + static int makeStickyKeyHash(byte[] stickyKey) { + return Murmur3_32Hash.getInstance().makeHash(stickyKey); + } + + /** + * Select a consumer by hash. + * + * @param hash hash corresponding to sticky key + * @return consumer + */ + Consumer select(int hash); /** * Get key hash ranges handled by each consumer. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java new file mode 100644 index 00000000000000..be143565c483f7 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import com.google.common.collect.ComparisonChain; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair; +import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet; +import org.apache.pulsar.common.util.collections.LongPairSet; + +public class MessageRedeliveryController { + private final LongPairSet messagesToRedeliver; + private final ConcurrentLongLongPairHashMap hashesToBeBlocked; + + public MessageRedeliveryController(boolean allowOutOfOrderDelivery) { + this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); + this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2); + } + + public boolean add(long ledgerId, long entryId) { + return messagesToRedeliver.add(ledgerId, entryId); + } + + public boolean add(long ledgerId, long entryId, long stickyKeyHash) { + if (hashesToBeBlocked != null) { + hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0); + } + return messagesToRedeliver.add(ledgerId, entryId); + } + + public boolean remove(long ledgerId, long entryId) { + if (hashesToBeBlocked != null) { + hashesToBeBlocked.remove(ledgerId, entryId); + } + return messagesToRedeliver.remove(ledgerId, entryId); + } + + public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { + if (hashesToBeBlocked != null) { + List keysToRemove = new ArrayList<>(); + hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> { + if (ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId) + .result() <= 0) { + keysToRemove.add(new LongPair(ledgerId, entryId)); + } + }); + keysToRemove.forEach(longPair -> hashesToBeBlocked.remove(longPair.first, longPair.second)); + keysToRemove.clear(); + } + return messagesToRedeliver.removeIf((ledgerId, entryId) -> { + return ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId) + .result() <= 0; + }); + } + + public boolean isEmpty() { + return messagesToRedeliver.isEmpty(); + } + + public void clear() { + if (hashesToBeBlocked != null) { + hashesToBeBlocked.clear(); + } + messagesToRedeliver.clear(); + } + + public String toString() { + return messagesToRedeliver.toString(); + } + + public boolean containsStickyKeyHashes(Set stickyKeyHashes) { + final AtomicBoolean isContained = new AtomicBoolean(false); + if (hashesToBeBlocked != null) { + hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> { + if (!isContained.get() && stickyKeyHashes.contains((int) stickyKeyHash)) { + isContained.set(true); + } + }); + } + return isContained.get(); + } + + public Set getMessagesToReplayNow(int maxMessagesToRead) { + if (hashesToBeBlocked != null) { + // allowOutOfOrderDelivery is false + return messagesToRedeliver.items().stream() + .sorted((l1, l2) -> ComparisonChain.start().compare(l1.first, l2.first) + .compare(l1.second, l2.second).result()) + .limit(maxMessagesToRead).map(longPair -> new PositionImpl(longPair.first, longPair.second)) + .collect(Collectors.toCollection(TreeSet::new)); + } else { + // allowOutOfOrderDelivery is true + return messagesToRedeliver.items(maxMessagesToRead, + (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 179d866b112f92..06a5c4cad518dc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; -import com.google.common.collect.ComparisonChain; import com.google.common.collect.Lists; import com.google.common.collect.Range; import java.util.Collections; @@ -53,6 +52,7 @@ import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.SendMessageInfo; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException; @@ -62,8 +62,6 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.Codec; -import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet; -import org.apache.pulsar.common.util.collections.LongPairSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +76,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected volatile Range lastIndividualDeletedRangeFromCursorRecovery; private CompletableFuture closeFuture = null; - protected LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); + protected final MessageRedeliveryController redeliveryMessages; protected final RedeliveryTracker redeliveryTracker; private Optional delayedDeliveryTracker = Optional.empty(); @@ -113,12 +111,18 @@ protected enum ReadType { } public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, - Subscription subscription) { + Subscription subscription) { + this(topic, cursor, subscription, true); + } + + public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, + boolean allowOutOfOrderDelivery) { super(subscription, topic.getBrokerService().pulsar().getConfiguration()); this.cursor = cursor; this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange(); this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; + this.redeliveryMessages = new MessageRedeliveryController(allowOutOfOrderDelivery); this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled() ? new InMemoryRedeliveryTracker() : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; @@ -141,7 +145,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce cursor.rewind(); shouldRewindBeforeReadingOrReplaying = false; } - messagesToRedeliver.clear(); + redeliveryMessages.clear(); } if (isConsumersExceededOnSubscription()) { @@ -169,7 +173,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE if (consumerList.isEmpty()) { cancelPendingRead(); - messagesToRedeliver.clear(); + redeliveryMessages.clear(); redeliveryTracker.clear(); if (closeFuture != null) { log.info("[{}] All consumers removed. Subscription is disconnected", name); @@ -180,8 +184,8 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE if (log.isDebugEnabled()) { log.debug("[{}] Consumer are left, reading more entries", name); } - consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - if (addMessageToReplay(ledgerId, entryId)) { + consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { + if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) { redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId)); } }); @@ -244,9 +248,9 @@ public synchronized void readMoreEntries() { ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket - deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(), + deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); - // if all the entries are acked-entries and cleared up from messagesToRedeliver, try to read + // if all the entries are acked-entries and cleared up from redeliveryMessages, try to read // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; @@ -528,7 +532,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // remove positions first from replay list first : sendMessages recycles entries if (readType == ReadType.Replay) { entries.subList(start, end).forEach(entry -> { - messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()); + redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); }); } @@ -583,7 +587,8 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { entries.size() - start); } entries.subList(start, entries.size()).forEach(entry -> { - addMessageToReplay(entry.getLedgerId(), entry.getEntryId()); + long stickyKeyHash = getStickyKeyHash(entry); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); }); } @@ -629,11 +634,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj havePendingReplayRead = false; if (exception instanceof ManagedLedgerException.InvalidReplayPositionException) { PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); - - messagesToRedeliver.removeIf((ledgerId, entryId) -> { - return ComparisonChain.start().compare(ledgerId, markDeletePosition.getLedgerId()) - .compare(entryId, markDeletePosition.getEntryId()).result() <= 0; - }); + redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); } } @@ -706,12 +707,12 @@ public boolean isConsumerAvailable(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { - consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - addMessageToReplay(ledgerId, entryId); + consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { + addMessageToReplay(ledgerId, entryId, stickyKeyHash); }); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, - messagesToRedeliver); + redeliveryMessages); } readMoreEntries(); } @@ -719,6 +720,8 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { positions.forEach(position -> { + // TODO: We want to pass a sticky key hash as a third argument to guarantee the order of the messages + // on Key_Shared subscription, but it's difficult to get the sticky key here if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) { redeliveryTracker.addIfAbsent(position); } @@ -838,9 +841,8 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata } protected synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { - if (!messagesToRedeliver.isEmpty()) { - return messagesToRedeliver.items(maxMessagesToRead, - (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); + if (!redeliveryMessages.isEmpty()) { + return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead); } else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); @@ -866,17 +868,30 @@ public void cursorIsReset() { } } + protected boolean addMessageToReplay(long ledgerId, long entryId, long stickyKeyHash) { + if (checkIfMessageIsUnacked(ledgerId, entryId)) { + redeliveryMessages.add(ledgerId, entryId, stickyKeyHash); + return true; + } else { + return false; + } + } + protected boolean addMessageToReplay(long ledgerId, long entryId) { - Position markDeletePosition = cursor.getMarkDeletedPosition(); - if (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId() - || (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId())) { - messagesToRedeliver.add(ledgerId, entryId); + if (checkIfMessageIsUnacked(ledgerId, entryId)) { + redeliveryMessages.add(ledgerId, entryId); return true; } else { return false; } } + private boolean checkIfMessageIsUnacked(long ledgerId, long entryId) { + Position markDeletePosition = cursor.getMarkDeletedPosition(); + return (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId() + || (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId())); + } + @Override public boolean checkAndUnblockIfStuck() { if (cursor.checkAndUpdateReadPositionChanged()) { @@ -896,5 +911,9 @@ public PersistentTopic getTopic() { return topic; } + protected int getStickyKeyHash(Entry entry) { + return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer())); + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index d9a56a095e3477..c23b3600e23352 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -70,7 +70,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { - super(topic, cursor, subscription); + super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery()); this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery(); this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); @@ -123,7 +123,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { // The consumer must be removed from the selector before calling the superclass removeConsumer method. // In the superclass removeConsumer method, the pending acks that the consumer has are added to - // messagesToRedeliver. If the consumer has not been removed from the selector at this point, + // redeliveryMessages. If the consumer has not been removed from the selector at this point, // the broker will try to redeliver the messages to the consumer that has already been closed. // As a result, the messages are not redelivered to any consumer, and the mark-delete position does not move, // eventually causing all consumers to get stuck. @@ -134,7 +134,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE if (consumerList.size() == 1) { recentlyJoinedConsumers.clear(); } - if (removeConsumersFromRecentJoinedConsumers() || messagesToRedeliver.size() > 0) { + if (removeConsumersFromRecentJoinedConsumers() || !redeliveryMessages.isEmpty()) { readMoreEntries(); } } @@ -170,10 +170,13 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); + final Map> consumerStickyKeyHashesMap = new HashMap<>(); for (Entry entry : entries) { - Consumer c = selector.select(peekStickyKey(entry.getDataBuffer())); + int stickyKeyHash = getStickyKeyHash(entry); + Consumer c = selector.select(stickyKeyHash); groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); + consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash); } AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size()); @@ -188,8 +191,8 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { int entriesWithSameKeyCount = entriesWithSameKey.size(); final int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0); int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits); - int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, - maxMessagesForC, readType); + int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, + readType, consumerStickyKeyHashesMap.get(consumer)); if (log.isDebugEnabled()) { log.debug("[{}] select consumer {} with messages num {}, read type is {}", name, consumer == null ? "null" : consumer.consumerName(), messagesForC, readType); @@ -200,7 +203,8 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // so we discard for now and mark them for later redelivery for (int i = messagesForC; i < entriesWithSameKeyCount; i++) { Entry entry = entriesWithSameKey.get(i); - addMessageToReplay(entry.getLedgerId(), entry.getEntryId()); + long stickyKeyHash = getStickyKeyHash(entry); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); entriesWithSameKey.set(i, null); } @@ -211,7 +215,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { if (readType == ReadType.Replay) { for (int i = 0; i < messagesForC; i++) { Entry entry = entriesWithSameKey.get(i); - messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()); + redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); } } @@ -282,13 +286,19 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { } } - private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, - int maxMessages, ReadType readType) { + private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, int maxMessages, + ReadType readType, Set stickyKeyHashes) { if (maxMessages == 0) { // the consumer was stuck nextStuckConsumers.add(consumer); return 0; } + if (readType == ReadType.Normal && stickyKeyHashes != null + && redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) { + // If redeliveryMessages contains messages that correspond to the same hash as the messages + // that the dispatcher is trying to send, do not send those messages for order guarantee + return 0; + } if (recentlyJoinedConsumers == null) { return maxMessages; } @@ -371,6 +381,7 @@ private boolean removeConsumersFromRecentJoinedConsumers() { return hasConsumerRemovedFromTheRecentJoinedConsumers; } + @Override protected synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { if (isDispatcherStuckOnReplays) { // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index f7d47e6c2c982a..5235c13dc81fdd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -161,9 +161,9 @@ public synchronized void readMoreEntries() { ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket - deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(), + deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); - // if all the entries are acked-entries and cleared up from messagesToRedeliver, try to read + // if all the entries are acked-entries and cleared up from redeliveryMessages, try to read // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 283cd98f30aae2..ce48402880afe4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -47,6 +47,7 @@ import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -79,7 +80,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.awaitility.Awaitility; import org.testng.Assert; @@ -1667,9 +1667,10 @@ public void testMessageReplay() throws Exception { PersistentSubscription subRef = topicRef.getSubscription(subName); PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef .getDispatcher(); - Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToRedeliver"); - replayMap.setAccessible(true); - ConcurrentLongPairSet messagesToReplay = new ConcurrentLongPairSet(64, 1); + Field redeliveryMessagesField = PersistentDispatcherMultipleConsumers.class + .getDeclaredField("redeliveryMessages"); + redeliveryMessagesField.setAccessible(true); + MessageRedeliveryController redeliveryMessages = new MessageRedeliveryController(true); assertNotNull(subRef); @@ -1690,24 +1691,24 @@ public void testMessageReplay() throws Exception { } if (i < replayIndex) { // (3) accumulate acked messages for replay - messagesToReplay.add(msgId.getLedgerId(), msgId.getEntryId()); + redeliveryMessages.add(msgId.getLedgerId(), msgId.getEntryId()); } } // (4) redelivery : should redeliver only unacked messages Thread.sleep(1000); - replayMap.set(dispatcher, messagesToReplay); + redeliveryMessagesField.set(dispatcher, redeliveryMessages); // (a) redelivery with all acked-message should clear messageReply bucket dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0)); Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { - return messagesToReplay.isEmpty(); + return redeliveryMessages.isEmpty(); }); - assertEquals(messagesToReplay.size(), 0); + assertTrue(redeliveryMessages.isEmpty()); // (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it - messagesToReplay.add(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId()); - replayMap.set(dispatcher, messagesToReplay); + redeliveryMessages.add(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId()); + redeliveryMessagesField.set(dispatcher, redeliveryMessages); // send new message final String testMsg = "testMsg"; producer.send(testMsg.getBytes()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java new file mode 100644 index 00000000000000..9a785f6f95fd61 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertEqualsNoOrder; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import java.lang.reflect.Field; +import java.util.Set; +import java.util.TreeSet; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.common.util.collections.LongPairSet; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class MessageRedeliveryControllerTest { + @DataProvider(name = "allowOutOfOrderDelivery") + public Object[][] dataProvider() { + return new Object[][] { { true }, { false } }; + } + + @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000) + public void testAddAndRemove(boolean allowOutOfOrderDelivery) throws Exception { + MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery); + + Field messagesToRedeliverField = MessageRedeliveryController.class.getDeclaredField("messagesToRedeliver"); + messagesToRedeliverField.setAccessible(true); + LongPairSet messagesToRedeliver = (LongPairSet) messagesToRedeliverField.get(controller); + + Field hashesToBeBlockedField = MessageRedeliveryController.class.getDeclaredField("hashesToBeBlocked"); + hashesToBeBlockedField.setAccessible(true); + ConcurrentLongLongPairHashMap hashesToBeBlocked = (ConcurrentLongLongPairHashMap) hashesToBeBlockedField + .get(controller); + + if (allowOutOfOrderDelivery) { + assertNull(hashesToBeBlocked); + } else { + assertNotNull(hashesToBeBlocked); + } + + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + } + + assertTrue(controller.add(1, 1)); + assertTrue(controller.add(1, 2)); + assertFalse(controller.add(1, 1)); + + assertFalse(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 2); + assertTrue(messagesToRedeliver.contains(1, 1)); + assertTrue(messagesToRedeliver.contains(1, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + assertFalse(hashesToBeBlocked.containsKey(1, 1)); + assertFalse(hashesToBeBlocked.containsKey(1, 2)); + } + + assertTrue(controller.remove(1, 1)); + assertTrue(controller.remove(1, 2)); + assertFalse(controller.remove(1, 1)); + + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + assertFalse(messagesToRedeliver.contains(1, 1)); + assertFalse(messagesToRedeliver.contains(1, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + } + + assertTrue(controller.add(2, 1, 100)); + assertTrue(controller.add(2, 2, 101)); + assertTrue(controller.add(2, 3, 101)); + assertFalse(controller.add(2, 1, 100)); + + assertFalse(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 3); + assertTrue(messagesToRedeliver.contains(2, 1)); + assertTrue(messagesToRedeliver.contains(2, 2)); + assertTrue(messagesToRedeliver.contains(2, 3)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 3); + assertEquals(hashesToBeBlocked.get(2, 1).first, 100); + assertEquals(hashesToBeBlocked.get(2, 2).first, 101); + assertEquals(hashesToBeBlocked.get(2, 3).first, 101); + } + + controller.clear(); + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + assertTrue(messagesToRedeliver.isEmpty()); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + assertTrue(hashesToBeBlocked.isEmpty()); + } + + controller.add(2, 2, 201); + controller.add(1, 3, 100); + controller.add(3, 1, 300); + controller.add(2, 1, 200); + controller.add(3, 2, 301); + controller.add(1, 2, 101); + controller.add(1, 1, 100); + + controller.removeAllUpTo(1, 3); + assertEquals(messagesToRedeliver.size(), 4); + assertTrue(messagesToRedeliver.contains(2, 1)); + assertTrue(messagesToRedeliver.contains(2, 2)); + assertTrue(messagesToRedeliver.contains(3, 1)); + assertTrue(messagesToRedeliver.contains(3, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 4); + assertEquals(hashesToBeBlocked.get(2, 1).first, 200); + assertEquals(hashesToBeBlocked.get(2, 2).first, 201); + assertEquals(hashesToBeBlocked.get(3, 1).first, 300); + assertEquals(hashesToBeBlocked.get(3, 2).first, 301); + } + + controller.removeAllUpTo(3, 1); + assertEquals(messagesToRedeliver.size(), 1); + assertTrue(messagesToRedeliver.contains(3, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 1); + assertEquals(hashesToBeBlocked.get(3, 2).first, 301); + } + + controller.removeAllUpTo(5, 10); + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + } + } + + @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000) + public void testContainsStickyKeyHashes(boolean allowOutOfOrderDelivery) throws Exception { + MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery); + controller.add(1, 1, 100); + controller.add(1, 2, 101); + controller.add(1, 3, 102); + controller.add(2, 2, 103); + controller.add(2, 1, 104); + + if (allowOutOfOrderDelivery) { + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(100))); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(101, 102, 103))); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(104, 105))); + } else { + assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(100))); + assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(101, 102, 103))); + assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(104, 105))); + } + + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet())); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(99))); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(105, 106))); + } + + @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000) + public void testGetMessagesToReplayNow(boolean allowOutOfOrderDelivery) throws Exception { + MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery); + controller.add(2, 2); + controller.add(1, 3); + controller.add(3, 1); + controller.add(2, 1); + controller.add(3, 2); + controller.add(1, 2); + controller.add(1, 1); + + if (allowOutOfOrderDelivery) { + // The entries are sorted by ledger ID but not by entry ID + PositionImpl[] actual1 = controller.getMessagesToReplayNow(3).toArray(new PositionImpl[3]); + PositionImpl[] expected1 = { PositionImpl.get(1, 1), PositionImpl.get(1, 2), PositionImpl.get(1, 3) }; + assertEqualsNoOrder(actual1, expected1); + } else { + // The entries are completely sorted + Set actual2 = controller.getMessagesToReplayNow(6); + Set expected2 = new TreeSet<>(); + expected2.add(PositionImpl.get(1, 1)); + expected2.add(PositionImpl.get(1, 2)); + expected2.add(PositionImpl.get(1, 3)); + expected2.add(PositionImpl.get(2, 1)); + expected2.add(PositionImpl.get(2, 2)); + expected2.add(PositionImpl.get(3, 1)); + assertEquals(actual2, expected2); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 4900d99a6eb6c2..3cb5bfbdc44560 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -22,8 +22,10 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelPromise; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerService; @@ -31,6 +33,7 @@ import org.apache.pulsar.broker.service.EntryBatchIndexesAcks; import org.apache.pulsar.broker.service.EntryBatchSizes; import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.common.api.proto.KeySharedMeta; import org.apache.pulsar.common.api.proto.KeySharedMode; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -48,15 +51,23 @@ import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyList; import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anySet; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.argThat; import static org.mockito.Mockito.eq; @@ -298,6 +309,123 @@ public void testSkipRedeliverTemporally() { ); } + @Test(timeOut = 30000) + public void testMessageRedelivery() throws Exception { + final Queue actualEntriesToConsumer1 = new ConcurrentLinkedQueue<>(); + final Queue actualEntriesToConsumer2 = new ConcurrentLinkedQueue<>(); + + final Queue expectedEntriesToConsumer1 = new ConcurrentLinkedQueue<>(); + expectedEntriesToConsumer1.add(PositionImpl.get(1, 1)); + final Queue expectedEntriesToConsumer2 = new ConcurrentLinkedQueue<>(); + expectedEntriesToConsumer2.add(PositionImpl.get(1, 2)); + expectedEntriesToConsumer2.add(PositionImpl.get(1, 3)); + + final AtomicInteger remainingEntriesNum = new AtomicInteger( + expectedEntriesToConsumer1.size() + expectedEntriesToConsumer2.size()); + + // Messages with key1 are routed to consumer1 and messages with key2 are routed to consumer2 + final List allEntries = new ArrayList<>(); + allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key2"))); + allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1"))); + allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key1"))); + allEntries.forEach(entry -> ((EntryImpl) entry).retain()); + + final List redeliverEntries = new ArrayList<>(); + redeliverEntries.add(allEntries.get(0)); // message1 + final List readEntries = new ArrayList<>(); + readEntries.add(allEntries.get(2)); // message3 + + final Consumer consumer1 = mock(Consumer.class); + doReturn("consumer1").when(consumer1).consumerName(); + // Change availablePermits of consumer1 to 0 and then back to normal + when(consumer1.getAvailablePermits()).thenReturn(0).thenReturn(10); + doReturn(true).when(consumer1).isWritable(); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + List entries = (List) invocationOnMock.getArgument(0); + for (Entry entry : entries) { + remainingEntriesNum.decrementAndGet(); + actualEntriesToConsumer1.add(entry.getPosition()); + } + return channelMock; + }).when(consumer1).sendMessages(anyList(), any(EntryBatchSizes.class), any(EntryBatchIndexesAcks.class), + anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + + final Consumer consumer2 = mock(Consumer.class); + doReturn("consumer2").when(consumer2).consumerName(); + when(consumer2.getAvailablePermits()).thenReturn(10); + doReturn(true).when(consumer2).isWritable(); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + List entries = (List) invocationOnMock.getArgument(0); + for (Entry entry : entries) { + remainingEntriesNum.decrementAndGet(); + actualEntriesToConsumer2.add(entry.getPosition()); + } + return channelMock; + }).when(consumer2).sendMessages(anyList(), any(EntryBatchSizes.class), any(EntryBatchIndexesAcks.class), + anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + + persistentDispatcher.addConsumer(consumer1); + persistentDispatcher.addConsumer(consumer2); + + final Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class + .getDeclaredField("totalAvailablePermits"); + totalAvailablePermitsField.setAccessible(true); + totalAvailablePermitsField.set(persistentDispatcher, 1000); + + final Field redeliveryMessagesField = PersistentDispatcherMultipleConsumers.class + .getDeclaredField("redeliveryMessages"); + redeliveryMessagesField.setAccessible(true); + MessageRedeliveryController redeliveryMessages = (MessageRedeliveryController) redeliveryMessagesField + .get(persistentDispatcher); + redeliveryMessages.add(allEntries.get(0).getLedgerId(), allEntries.get(0).getEntryId(), + getStickyKeyHash(allEntries.get(0))); // message1 + redeliveryMessages.add(allEntries.get(1).getLedgerId(), allEntries.get(1).getEntryId(), + getStickyKeyHash(allEntries.get(1))); // message2 + + // Mock Cursor#asyncReplayEntries + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + Set positions = (Set) invocationOnMock.getArgument(0); + List entries = allEntries.stream().filter(entry -> positions.contains(entry.getPosition())) + .collect(Collectors.toList()); + if (!entries.isEmpty()) { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(1)) + .readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay); + } + return Collections.emptySet(); + }).when(cursorMock).asyncReplayEntries(anySet(), any(PersistentStickyKeyDispatcherMultipleConsumers.class), + eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), anyBoolean()); + + // Mock Cursor#asyncReadEntriesOrWait + doAnswer(invocationOnMock -> { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) + .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + return null; + }).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(), + any(PersistentStickyKeyDispatcherMultipleConsumers.class), + eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal), any()); + + // (1) Run sendMessagesToConsumers + // (2) Attempts to send message1 to consumer1 but skipped because availablePermits is 0 + // (3) Change availablePermits of consumer1 to 10 + // (4) Run readMoreEntries internally + // (5) Run sendMessagesToConsumers internally + // (6) Attempts to send message3 to consumer2 but skipped because redeliveryMessages contains message2 + persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay, + redeliverEntries); + while (remainingEntriesNum.get() > 0) { + // (7) Run readMoreEntries and resend message1 to consumer1 and message2-3 to consumer2 + persistentDispatcher.readMoreEntries(); + } + + assertEquals(actualEntriesToConsumer1, expectedEntriesToConsumer1); + assertEquals(actualEntriesToConsumer2, expectedEntriesToConsumer2); + + allEntries.forEach(entry -> entry.release()); + } + private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); } @@ -311,4 +439,9 @@ private ByteBuf createMessage(String message, int sequenceId, String key) { .setPublishTime(System.currentTimeMillis()); return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, Unpooled.copiedBuffer(message.getBytes(UTF_8))); } + + private int getStickyKeyHash(Entry entry) { + byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscriptionName); + return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); + } }