diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java index 5b091baa49654e..f0f64314918cad 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java @@ -104,9 +104,7 @@ public void removeConsumer(Consumer consumer) { } @Override - public Consumer select(byte[] stickyKey) { - int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey); - + public Consumer select(int hash) { rwLock.readLock().lock(); try { if (hashRing.isEmpty()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index f905cb52fd3777..b3300d3336beb6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -51,6 +51,7 @@ import org.apache.pulsar.common.api.proto.MessageIdData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -235,7 +236,8 @@ public Future sendMessages(final List entries, EntryBatchSizes batc Entry entry = entries.get(i); if (entry != null) { int batchSize = batchSizes.getBatchSize(i); - pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0); + int stickyKeyHash = getStickyKeyHash(entry); + pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, stickyKeyHash); if (log.isDebugEnabled()){ log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in" + " broker.service.Consumer for consumerId: {}", @@ -743,7 +745,7 @@ public void redeliverUnacknowledgedMessages() { if (pendingAcks != null) { List pendingPositions = new ArrayList<>((int) pendingAcks.size()); MutableInt totalRedeliveryMessages = new MutableInt(0); - pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> { + pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { totalRedeliveryMessages.add((int) batchSize); pendingPositions.add(new PositionImpl(ledgerId, entryId)); }); @@ -766,10 +768,11 @@ public void redeliverUnacknowledgedMessages(List messageIds) { List pendingPositions = Lists.newArrayList(); for (MessageIdData msg : messageIds) { PositionImpl position = PositionImpl.get(msg.getLedgerId(), msg.getEntryId()); - LongPair batchSize = pendingAcks.get(position.getLedgerId(), position.getEntryId()); - if (batchSize != null) { + LongPair longPair = pendingAcks.get(position.getLedgerId(), position.getEntryId()); + if (longPair != null) { + long batchSize = longPair.first; pendingAcks.remove(position.getLedgerId(), position.getEntryId()); - totalRedeliveryMessages += batchSize.first; + totalRedeliveryMessages += batchSize; pendingPositions.add(position); } } @@ -836,5 +839,10 @@ public String getClientAddress() { return clientAddress; } + private int getStickyKeyHash(Entry entry) { + byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscription.getName()); + return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); + } + private static final Logger log = LoggerFactory.getLogger(Consumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java index e1cc2b8a232052..4f721c955da4b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java @@ -26,7 +26,6 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; -import org.apache.pulsar.common.util.Murmur3_32Hash; /** * This is a consumer selector based fixed hash range. @@ -103,8 +102,7 @@ public synchronized void removeConsumer(Consumer consumer) { } @Override - public Consumer select(byte[] stickyKey) { - int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey); + public Consumer select(int hash) { if (rangeMap.size() > 0) { int slot = hash % rangeSize; return rangeMap.ceilingEntry(slot).getValue(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java index e6be9bf0ebe0ca..f4909ae4fe2e52 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java @@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.api.proto.KeySharedMeta; -import org.apache.pulsar.common.util.Murmur3_32Hash; /** * This is a sticky-key consumer selector based user provided range. @@ -65,11 +64,6 @@ public void removeConsumer(Consumer consumer) { rangeMap.entrySet().removeIf(entry -> entry.getValue().equals(consumer)); } - @Override - public Consumer select(byte[] stickyKey) { - return select(Murmur3_32Hash.getInstance().makeHash(stickyKey)); - } - @Override public Map> getConsumerKeyHashRanges() { Map> result = new HashMap<>(); @@ -88,7 +82,8 @@ public Map> getConsumerKeyHashRanges() { return result; } - Consumer select(int hash) { + @Override + public Consumer select(int hash) { if (rangeMap.size() > 0) { int slot = hash % rangeSize; Map.Entry ceilingEntry = rangeMap.ceilingEntry(slot); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java index 71972614fd7f2c..83952fc34725f8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; +import org.apache.pulsar.common.util.Murmur3_32Hash; public interface StickyKeyConsumerSelector { @@ -45,7 +46,21 @@ public interface StickyKeyConsumerSelector { * @param stickyKey sticky key * @return consumer */ - Consumer select(byte[] stickyKey); + default Consumer select(byte[] stickyKey) { + return select(makeStickyKeyHash(stickyKey)); + } + + static int makeStickyKeyHash(byte[] stickyKey) { + return Murmur3_32Hash.getInstance().makeHash(stickyKey); + } + + /** + * Select a consumer by hash. + * + * @param hash hash corresponding to sticky key + * @return consumer + */ + Consumer select(int hash); /** * Get key hash ranges handled by each consumer. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java new file mode 100644 index 00000000000000..8f4751c48cc9f9 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import com.google.common.collect.ComparisonChain; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair; +import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet; +import org.apache.pulsar.common.util.collections.LongPairSet; + +public class MessageRedeliveryController { + private final LongPairSet messagesToRedeliver; + private final ConcurrentLongLongPairHashMap hashesToBeBlocked; + + public MessageRedeliveryController(boolean allowOutOfOrderDelivery) { + this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); + this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2); + } + + public boolean add(long ledgerId, long entryId) { + return messagesToRedeliver.add(ledgerId, entryId); + } + + public boolean add(long ledgerId, long entryId, long stickyKeyHash) { + if (hashesToBeBlocked != null) { + hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0); + } + return messagesToRedeliver.add(ledgerId, entryId); + } + + public boolean remove(long ledgerId, long entryId) { + if (hashesToBeBlocked != null) { + hashesToBeBlocked.remove(ledgerId, entryId); + } + return messagesToRedeliver.remove(ledgerId, entryId); + } + + public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { + if (hashesToBeBlocked != null) { + for (LongPair longPair : hashesToBeBlocked.keys()) { + long ledgerId = longPair.first; + long entryId = longPair.second; + if (ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId) + .result() <= 0) { + hashesToBeBlocked.remove(ledgerId, entryId); + } + } + } + return messagesToRedeliver.removeIf((ledgerId, entryId) -> { + return ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId) + .result() <= 0; + }); + } + + public boolean isEmpty() { + return messagesToRedeliver.isEmpty(); + } + + public void clear() { + if (hashesToBeBlocked != null) { + hashesToBeBlocked.clear(); + } + messagesToRedeliver.clear(); + } + + public String toString() { + return messagesToRedeliver.toString(); + } + + public boolean containsStickyKeyHashes(Set stickyKeyHashes) { + if (hashesToBeBlocked != null) { + for (LongPair longPair : hashesToBeBlocked.values()) { + int stickyKeyHash = (int) longPair.first; + if (stickyKeyHashes.contains(stickyKeyHash)) { + return true; + } + } + } + return false; + } + + public Set getMessagesToReplayNow(int maxMessagesToRead) { + if (hashesToBeBlocked != null) { + return messagesToRedeliver.items().stream() + .sorted((l1, l2) -> ComparisonChain.start().compare(l1.first, l2.first) + .compare(l1.second, l2.second).result()) + .limit(maxMessagesToRead).map(longPair -> new PositionImpl(longPair.first, longPair.second)) + .collect(Collectors.toCollection(TreeSet::new)); + } else { + return messagesToRedeliver.items(maxMessagesToRead, + (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index b6479a33d8db7a..f9d9bd3ed7814b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.service.persistent; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; -import com.google.common.collect.ComparisonChain; import com.google.common.collect.Lists; import com.google.common.collect.Range; import java.util.Collections; @@ -52,6 +51,7 @@ import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.SendMessageInfo; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException; @@ -61,8 +61,6 @@ import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.util.Codec; -import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet; -import org.apache.pulsar.common.util.collections.LongPairSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +75,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected volatile Range lastIndividualDeletedRangeFromCursorRecovery; private CompletableFuture closeFuture = null; - protected LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); + protected final MessageRedeliveryController redeliveryMessages; protected final RedeliveryTracker redeliveryTracker; private Optional delayedDeliveryTracker = Optional.empty(); @@ -113,13 +111,19 @@ protected enum ReadType { } public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, - Subscription subscription) { + Subscription subscription) { + this(topic, cursor, subscription, true); + } + + public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, + boolean allowOutOfOrderDelivery) { super(subscription); this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration(); this.cursor = cursor; this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange(); this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; + this.redeliveryMessages = new MessageRedeliveryController(allowOutOfOrderDelivery); this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled() ? new InMemoryRedeliveryTracker() : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; @@ -142,7 +146,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce cursor.rewind(); shouldRewindBeforeReadingOrReplaying = false; } - messagesToRedeliver.clear(); + redeliveryMessages.clear(); } if (isConsumersExceededOnSubscription()) { @@ -170,7 +174,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE if (consumerList.isEmpty()) { cancelPendingRead(); - messagesToRedeliver.clear(); + redeliveryMessages.clear(); redeliveryTracker.clear(); if (closeFuture != null) { log.info("[{}] All consumers removed. Subscription is disconnected", name); @@ -181,8 +185,8 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE if (log.isDebugEnabled()) { log.debug("[{}] Consumer are left, reading more entries", name); } - consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - if (addMessageToReplay(ledgerId, entryId)) { + consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { + if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) { redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId)); } }); @@ -243,9 +247,9 @@ public synchronized void readMoreEntries() { ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket - deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(), + deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); - // if all the entries are acked-entries and cleared up from messagesToRedeliver, try to read + // if all the entries are acked-entries and cleared up from redeliveryMessages, try to read // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; @@ -523,7 +527,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // remove positions first from replay list first : sendMessages recycles entries if (readType == ReadType.Replay) { entries.subList(start, start + messagesForC).forEach(entry -> { - messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()); + redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); }); } @@ -577,7 +581,8 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { entries.size() - start); } entries.subList(start, entries.size()).forEach(entry -> { - addMessageToReplay(entry.getLedgerId(), entry.getEntryId()); + long stickyKeyHash = getStickyKeyHash(entry); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); }); } @@ -623,11 +628,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj havePendingReplayRead = false; if (exception instanceof ManagedLedgerException.InvalidReplayPositionException) { PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); - - messagesToRedeliver.removeIf((ledgerId, entryId) -> { - return ComparisonChain.start().compare(ledgerId, markDeletePosition.getLedgerId()) - .compare(entryId, markDeletePosition.getEntryId()).result() <= 0; - }); + redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); } } @@ -697,12 +698,12 @@ public boolean isConsumerAvailable(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { - consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - addMessageToReplay(ledgerId, entryId); + consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { + addMessageToReplay(ledgerId, entryId, stickyKeyHash); }); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, - messagesToRedeliver); + redeliveryMessages); } readMoreEntries(); } @@ -710,7 +711,9 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { positions.forEach(position -> { - if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) { + // TODO: We want to pass a sticky key hash as a third argument to guarantee the order of the messages + // on Key_Shared subscription, but it's difficult to get the sticky key here + if (addMessageToReplay(position.getLedgerId(), position.getEntryId(), null)) { redeliveryTracker.addIfAbsent(position); } }); @@ -829,9 +832,8 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata } protected synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { - if (!messagesToRedeliver.isEmpty()) { - return messagesToRedeliver.items(maxMessagesToRead, - (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); + if (!redeliveryMessages.isEmpty()) { + return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead); } else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); @@ -857,11 +859,15 @@ public void cursorIsReset() { } } - protected boolean addMessageToReplay(long ledgerId, long entryId) { + protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) { Position markDeletePosition = cursor.getMarkDeletedPosition(); if (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId() || (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId())) { - messagesToRedeliver.add(ledgerId, entryId); + if (stickyKeyHash == null) { + redeliveryMessages.add(ledgerId, entryId); + } else { + redeliveryMessages.add(ledgerId, entryId, stickyKeyHash); + } return true; } else { return false; @@ -887,5 +893,9 @@ public PersistentTopic getTopic() { return topic; } + protected int getStickyKeyHash(Entry entry) { + return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer())); + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 5145c3b51b7ab8..3f144f30766d4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -70,7 +70,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { - super(topic, cursor, subscription); + super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery()); this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery(); this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); @@ -128,7 +128,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE if (consumerList.size() == 1) { recentlyJoinedConsumers.clear(); } - if (removeConsumersFromRecentJoinedConsumers() || messagesToRedeliver.size() > 0) { + if (removeConsumersFromRecentJoinedConsumers() || !redeliveryMessages.isEmpty()) { readMoreEntries(); } } @@ -142,6 +142,14 @@ protected Map> initialValue() throws Exception { } }; + private static final FastThreadLocal>> localConsumerStickyKeyHashesMap = + new FastThreadLocal>>() { + @Override + protected Map> initialValue() throws Exception { + return new HashMap<>(); + } + }; + @Override protected void sendMessagesToConsumers(ReadType readType, List entries) { long totalMessagesSent = 0; @@ -164,10 +172,14 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); + final Map> consumerStickyKeyHashesMap = localConsumerStickyKeyHashesMap.get(); + consumerStickyKeyHashesMap.clear(); for (Entry entry : entries) { - Consumer c = selector.select(peekStickyKey(entry.getDataBuffer())); + int stickyKeyHash = getStickyKeyHash(entry); + Consumer c = selector.select(stickyKeyHash); groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); + consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash); } AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size()); @@ -182,8 +194,8 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { int entriesWithSameKeyCount = entriesWithSameKey.size(); final int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0); int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits); - int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, - maxMessagesForC, readType); + int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, + readType, consumerStickyKeyHashesMap.get(consumer)); if (log.isDebugEnabled()) { log.debug("[{}] select consumer {} with messages num {}, read type is {}", name, consumer == null ? "null" : consumer.consumerName(), messagesForC, readType); @@ -194,7 +206,8 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // so we discard for now and mark them for later redelivery for (int i = messagesForC; i < entriesWithSameKeyCount; i++) { Entry entry = entriesWithSameKey.get(i); - addMessageToReplay(entry.getLedgerId(), entry.getEntryId()); + long stickyKeyHash = getStickyKeyHash(entry); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); entriesWithSameKey.set(i, null); } @@ -205,7 +218,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { if (readType == ReadType.Replay) { for (int i = 0; i < messagesForC; i++) { Entry entry = entriesWithSameKey.get(i); - messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()); + redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); } } @@ -276,13 +289,19 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { } } - private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, - int maxMessages, ReadType readType) { + private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, int maxMessages, + ReadType readType, Set stickyKeyHashes) { if (maxMessages == 0) { // the consumer was stuck nextStuckConsumers.add(consumer); return 0; } + if (readType == ReadType.Normal && stickyKeyHashes != null + && redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) { + // If redeliveryMessages contains messages that correspond to the same hash as the messages + // that the dispatcher is trying to send, do not send those messages for order guarantee + return 0; + } if (recentlyJoinedConsumers == null) { return maxMessages; } @@ -365,6 +384,7 @@ private boolean removeConsumersFromRecentJoinedConsumers() { return hasConsumerRemovedFromTheRecentJoinedConsumers; } + @Override protected synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { if (isDispatcherStuckOnReplays) { // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index 666bb9813cae88..b56e69a5e573d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -159,9 +159,9 @@ public synchronized void readMoreEntries() { ? asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket - deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(), + deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); - // if all the entries are acked-entries and cleared up from messagesToRedeliver, try to read + // if all the entries are acked-entries and cleared up from redeliveryMessages, try to read // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 283cd98f30aae2..ce48402880afe4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -47,6 +47,7 @@ import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -79,7 +80,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.awaitility.Awaitility; import org.testng.Assert; @@ -1667,9 +1667,10 @@ public void testMessageReplay() throws Exception { PersistentSubscription subRef = topicRef.getSubscription(subName); PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef .getDispatcher(); - Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToRedeliver"); - replayMap.setAccessible(true); - ConcurrentLongPairSet messagesToReplay = new ConcurrentLongPairSet(64, 1); + Field redeliveryMessagesField = PersistentDispatcherMultipleConsumers.class + .getDeclaredField("redeliveryMessages"); + redeliveryMessagesField.setAccessible(true); + MessageRedeliveryController redeliveryMessages = new MessageRedeliveryController(true); assertNotNull(subRef); @@ -1690,24 +1691,24 @@ public void testMessageReplay() throws Exception { } if (i < replayIndex) { // (3) accumulate acked messages for replay - messagesToReplay.add(msgId.getLedgerId(), msgId.getEntryId()); + redeliveryMessages.add(msgId.getLedgerId(), msgId.getEntryId()); } } // (4) redelivery : should redeliver only unacked messages Thread.sleep(1000); - replayMap.set(dispatcher, messagesToReplay); + redeliveryMessagesField.set(dispatcher, redeliveryMessages); // (a) redelivery with all acked-message should clear messageReply bucket dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0)); Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { - return messagesToReplay.isEmpty(); + return redeliveryMessages.isEmpty(); }); - assertEquals(messagesToReplay.size(), 0); + assertTrue(redeliveryMessages.isEmpty()); // (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it - messagesToReplay.add(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId()); - replayMap.set(dispatcher, messagesToReplay); + redeliveryMessages.add(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId()); + redeliveryMessagesField.set(dispatcher, redeliveryMessages); // send new message final String testMsg = "testMsg"; producer.send(testMsg.getBytes()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java new file mode 100644 index 00000000000000..9a785f6f95fd61 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertEqualsNoOrder; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import java.lang.reflect.Field; +import java.util.Set; +import java.util.TreeSet; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.common.util.collections.LongPairSet; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class MessageRedeliveryControllerTest { + @DataProvider(name = "allowOutOfOrderDelivery") + public Object[][] dataProvider() { + return new Object[][] { { true }, { false } }; + } + + @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000) + public void testAddAndRemove(boolean allowOutOfOrderDelivery) throws Exception { + MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery); + + Field messagesToRedeliverField = MessageRedeliveryController.class.getDeclaredField("messagesToRedeliver"); + messagesToRedeliverField.setAccessible(true); + LongPairSet messagesToRedeliver = (LongPairSet) messagesToRedeliverField.get(controller); + + Field hashesToBeBlockedField = MessageRedeliveryController.class.getDeclaredField("hashesToBeBlocked"); + hashesToBeBlockedField.setAccessible(true); + ConcurrentLongLongPairHashMap hashesToBeBlocked = (ConcurrentLongLongPairHashMap) hashesToBeBlockedField + .get(controller); + + if (allowOutOfOrderDelivery) { + assertNull(hashesToBeBlocked); + } else { + assertNotNull(hashesToBeBlocked); + } + + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + } + + assertTrue(controller.add(1, 1)); + assertTrue(controller.add(1, 2)); + assertFalse(controller.add(1, 1)); + + assertFalse(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 2); + assertTrue(messagesToRedeliver.contains(1, 1)); + assertTrue(messagesToRedeliver.contains(1, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + assertFalse(hashesToBeBlocked.containsKey(1, 1)); + assertFalse(hashesToBeBlocked.containsKey(1, 2)); + } + + assertTrue(controller.remove(1, 1)); + assertTrue(controller.remove(1, 2)); + assertFalse(controller.remove(1, 1)); + + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + assertFalse(messagesToRedeliver.contains(1, 1)); + assertFalse(messagesToRedeliver.contains(1, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + } + + assertTrue(controller.add(2, 1, 100)); + assertTrue(controller.add(2, 2, 101)); + assertTrue(controller.add(2, 3, 101)); + assertFalse(controller.add(2, 1, 100)); + + assertFalse(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 3); + assertTrue(messagesToRedeliver.contains(2, 1)); + assertTrue(messagesToRedeliver.contains(2, 2)); + assertTrue(messagesToRedeliver.contains(2, 3)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 3); + assertEquals(hashesToBeBlocked.get(2, 1).first, 100); + assertEquals(hashesToBeBlocked.get(2, 2).first, 101); + assertEquals(hashesToBeBlocked.get(2, 3).first, 101); + } + + controller.clear(); + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + assertTrue(messagesToRedeliver.isEmpty()); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + assertTrue(hashesToBeBlocked.isEmpty()); + } + + controller.add(2, 2, 201); + controller.add(1, 3, 100); + controller.add(3, 1, 300); + controller.add(2, 1, 200); + controller.add(3, 2, 301); + controller.add(1, 2, 101); + controller.add(1, 1, 100); + + controller.removeAllUpTo(1, 3); + assertEquals(messagesToRedeliver.size(), 4); + assertTrue(messagesToRedeliver.contains(2, 1)); + assertTrue(messagesToRedeliver.contains(2, 2)); + assertTrue(messagesToRedeliver.contains(3, 1)); + assertTrue(messagesToRedeliver.contains(3, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 4); + assertEquals(hashesToBeBlocked.get(2, 1).first, 200); + assertEquals(hashesToBeBlocked.get(2, 2).first, 201); + assertEquals(hashesToBeBlocked.get(3, 1).first, 300); + assertEquals(hashesToBeBlocked.get(3, 2).first, 301); + } + + controller.removeAllUpTo(3, 1); + assertEquals(messagesToRedeliver.size(), 1); + assertTrue(messagesToRedeliver.contains(3, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 1); + assertEquals(hashesToBeBlocked.get(3, 2).first, 301); + } + + controller.removeAllUpTo(5, 10); + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + } + } + + @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000) + public void testContainsStickyKeyHashes(boolean allowOutOfOrderDelivery) throws Exception { + MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery); + controller.add(1, 1, 100); + controller.add(1, 2, 101); + controller.add(1, 3, 102); + controller.add(2, 2, 103); + controller.add(2, 1, 104); + + if (allowOutOfOrderDelivery) { + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(100))); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(101, 102, 103))); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(104, 105))); + } else { + assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(100))); + assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(101, 102, 103))); + assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(104, 105))); + } + + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet())); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(99))); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(105, 106))); + } + + @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000) + public void testGetMessagesToReplayNow(boolean allowOutOfOrderDelivery) throws Exception { + MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery); + controller.add(2, 2); + controller.add(1, 3); + controller.add(3, 1); + controller.add(2, 1); + controller.add(3, 2); + controller.add(1, 2); + controller.add(1, 1); + + if (allowOutOfOrderDelivery) { + // The entries are sorted by ledger ID but not by entry ID + PositionImpl[] actual1 = controller.getMessagesToReplayNow(3).toArray(new PositionImpl[3]); + PositionImpl[] expected1 = { PositionImpl.get(1, 1), PositionImpl.get(1, 2), PositionImpl.get(1, 3) }; + assertEqualsNoOrder(actual1, expected1); + } else { + // The entries are completely sorted + Set actual2 = controller.getMessagesToReplayNow(6); + Set expected2 = new TreeSet<>(); + expected2.add(PositionImpl.get(1, 1)); + expected2.add(PositionImpl.get(1, 2)); + expected2.add(PositionImpl.get(1, 3)); + expected2.add(PositionImpl.get(2, 1)); + expected2.add(PositionImpl.get(2, 2)); + expected2.add(PositionImpl.get(3, 1)); + assertEquals(actual2, expected2); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index 8f485f11e9f86b..410d36f4b1572b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -22,8 +22,10 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelPromise; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.*; @@ -44,8 +46,14 @@ import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; @@ -283,6 +291,123 @@ public void testSkipRedeliverTemporally() { ); } + @Test(timeOut = 30000) + public void testMessageRedelivery() throws Exception { + final Queue actualEntriesToConsumer1 = new ConcurrentLinkedQueue<>(); + final Queue actualEntriesToConsumer2 = new ConcurrentLinkedQueue<>(); + + final Queue expectedEntriesToConsumer1 = new ConcurrentLinkedQueue<>(); + expectedEntriesToConsumer1.add(PositionImpl.get(1, 1)); + final Queue expectedEntriesToConsumer2 = new ConcurrentLinkedQueue<>(); + expectedEntriesToConsumer2.add(PositionImpl.get(1, 2)); + expectedEntriesToConsumer2.add(PositionImpl.get(1, 3)); + + final AtomicInteger remainingEntriesNum = new AtomicInteger( + expectedEntriesToConsumer1.size() + expectedEntriesToConsumer2.size()); + + // Messages with key1 are routed to consumer1 and messages with key2 are routed to consumer2 + final List allEntries = new ArrayList<>(); + allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key2"))); + allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1"))); + allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key1"))); + allEntries.forEach(entry -> ((EntryImpl) entry).retain()); + + final List redeliverEntries = new ArrayList<>(); + redeliverEntries.add(allEntries.get(0)); // message1 + final List readEntries = new ArrayList<>(); + readEntries.add(allEntries.get(2)); // message3 + + final Consumer consumer1 = mock(Consumer.class); + doReturn("consumer1").when(consumer1).consumerName(); + // Change availablePermits of consumer1 to 0 and then back to normal + when(consumer1.getAvailablePermits()).thenReturn(0).thenReturn(10); + doReturn(true).when(consumer1).isWritable(); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + List entries = (List) invocationOnMock.getArgument(0); + for (Entry entry : entries) { + remainingEntriesNum.decrementAndGet(); + actualEntriesToConsumer1.add(entry.getPosition()); + } + return channelMock; + }).when(consumer1).sendMessages(anyList(), any(EntryBatchSizes.class), any(EntryBatchIndexesAcks.class), + anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + + final Consumer consumer2 = mock(Consumer.class); + doReturn("consumer2").when(consumer2).consumerName(); + when(consumer2.getAvailablePermits()).thenReturn(10); + doReturn(true).when(consumer2).isWritable(); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + List entries = (List) invocationOnMock.getArgument(0); + for (Entry entry : entries) { + remainingEntriesNum.decrementAndGet(); + actualEntriesToConsumer2.add(entry.getPosition()); + } + return channelMock; + }).when(consumer2).sendMessages(anyList(), any(EntryBatchSizes.class), any(EntryBatchIndexesAcks.class), + anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + + persistentDispatcher.addConsumer(consumer1); + persistentDispatcher.addConsumer(consumer2); + + final Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class + .getDeclaredField("totalAvailablePermits"); + totalAvailablePermitsField.setAccessible(true); + totalAvailablePermitsField.set(persistentDispatcher, 1000); + + final Field redeliveryMessagesField = PersistentDispatcherMultipleConsumers.class + .getDeclaredField("redeliveryMessages"); + redeliveryMessagesField.setAccessible(true); + MessageRedeliveryController redeliveryMessages = (MessageRedeliveryController) redeliveryMessagesField + .get(persistentDispatcher); + redeliveryMessages.add(allEntries.get(0).getLedgerId(), allEntries.get(0).getEntryId(), + getStickyKeyHash(allEntries.get(0))); // message1 + redeliveryMessages.add(allEntries.get(1).getLedgerId(), allEntries.get(1).getEntryId(), + getStickyKeyHash(allEntries.get(1))); // message2 + + // Mock Cursor#asyncReplayEntries + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + Set positions = (Set) invocationOnMock.getArgument(0); + List entries = allEntries.stream().filter(entry -> positions.contains(entry.getPosition())) + .collect(Collectors.toList()); + if (!entries.isEmpty()) { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(1)) + .readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay); + } + return Collections.emptySet(); + }).when(cursorMock).asyncReplayEntries(anySet(), any(PersistentStickyKeyDispatcherMultipleConsumers.class), + eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), anyBoolean()); + + // Mock Cursor#asyncReadEntriesOrWait + doAnswer(invocationOnMock -> { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) + .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + return null; + }).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(), + any(PersistentStickyKeyDispatcherMultipleConsumers.class), + eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal), any()); + + // (1) Run sendMessagesToConsumers + // (2) Attempts to send message1 to consumer1 but skipped because availablePermits is 0 + // (3) Change availablePermits of consumer1 to 10 + // (4) Run readMoreEntries internally + // (5) Run sendMessagesToConsumers internally + // (6) Attempts to send message3 to consumer2 but skipped because redeliveryMessages contains message2 + persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay, + redeliverEntries); + while (remainingEntriesNum.get() > 0) { + // (7) Run readMoreEntries and resend message1 to consumer1 and message2-3 to consumer2 + persistentDispatcher.readMoreEntries(); + } + + assertEquals(actualEntriesToConsumer1, expectedEntriesToConsumer1); + assertEquals(actualEntriesToConsumer2, expectedEntriesToConsumer2); + + allEntries.forEach(entry -> entry.release()); + } + private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); } @@ -296,4 +421,9 @@ private ByteBuf createMessage(String message, int sequenceId, String key) { .setPublishTime(System.currentTimeMillis()); return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata, Unpooled.copiedBuffer(message.getBytes(UTF_8))); } + + private int getStickyKeyHash(Entry entry) { + byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscriptionName); + return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); + } }