diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java index 96029d8abe115..e2800484d59bc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java @@ -105,9 +105,7 @@ public void removeConsumer(Consumer consumer) { } @Override - public Consumer select(byte[] stickyKey) { - int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey); - + public Consumer select(int hash) { rwLock.readLock().lock(); try { if (hashRing.isEmpty()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 475fdc9a601fb..b6557f906a7b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -54,6 +54,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; @@ -236,7 +237,8 @@ public Future sendMessages(final List entries, EntryBatchSizes batc Entry entry = entries.get(i); if (entry != null) { int batchSize = batchSizes.getBatchSize(i); - pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0); + int stickyKeyHash = getStickyKeyHash(entry); + pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, stickyKeyHash); if (log.isDebugEnabled()){ log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in" + " broker.service.Consumer for consumerId: {}", @@ -729,7 +731,7 @@ public void redeliverUnacknowledgedMessages() { if (pendingAcks != null) { List pendingPositions = new ArrayList<>((int) pendingAcks.size()); MutableInt totalRedeliveryMessages = new MutableInt(0); - pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> { + pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { totalRedeliveryMessages.add((int) batchSize); pendingPositions.add(new PositionImpl(ledgerId, entryId)); }); @@ -752,10 +754,11 @@ public void redeliverUnacknowledgedMessages(List messageIds) { List pendingPositions = Lists.newArrayList(); for (MessageIdData msg : messageIds) { PositionImpl position = PositionImpl.get(msg.getLedgerId(), msg.getEntryId()); - LongPair batchSize = pendingAcks.get(position.getLedgerId(), position.getEntryId()); - if (batchSize != null) { + LongPair longPair = pendingAcks.get(position.getLedgerId(), position.getEntryId()); + if (longPair != null) { + long batchSize = longPair.first; pendingAcks.remove(position.getLedgerId(), position.getEntryId()); - totalRedeliveryMessages += batchSize.first; + totalRedeliveryMessages += batchSize; pendingPositions.add(position); } } @@ -810,5 +813,10 @@ public TransportCnx cnx() { return cnx; } + private int getStickyKeyHash(Entry entry) { + byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscription.getName()); + return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); + } + private static final Logger log = LoggerFactory.getLogger(Consumer.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java index f74165e7fc9e3..b715c51d330cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.service; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException; -import org.apache.pulsar.common.util.Murmur3_32Hash; import java.util.ArrayList; import java.util.Collections; @@ -104,8 +103,7 @@ public synchronized void removeConsumer(Consumer consumer) { } @Override - public Consumer select(byte[] stickyKey) { - int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey); + public Consumer select(int hash) { if (rangeMap.size() > 0) { int slot = hash % rangeSize; return rangeMap.ceilingEntry(slot).getValue(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java index 75cb6b5b5adf8..d8aea2eb3df1c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java @@ -19,7 +19,6 @@ package org.apache.pulsar.broker.service; import org.apache.pulsar.common.api.proto.PulsarApi; -import org.apache.pulsar.common.util.Murmur3_32Hash; import java.util.ArrayList; import java.util.Collections; @@ -65,11 +64,6 @@ public void removeConsumer(Consumer consumer) { rangeMap.entrySet().removeIf(entry -> entry.getValue().equals(consumer)); } - @Override - public Consumer select(byte[] stickyKey) { - return select(Murmur3_32Hash.getInstance().makeHash(stickyKey)); - } - @Override public Map> getConsumerKeyHashRanges() { Map> result = new HashMap<>(); @@ -88,7 +82,8 @@ public Map> getConsumerKeyHashRanges() { return result; } - Consumer select(int hash) { + @Override + public Consumer select(int hash) { if (rangeMap.size() > 0) { int slot = hash % rangeSize; Map.Entry ceilingEntry = rangeMap.ceilingEntry(slot); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java index 0d686f716faaa..5ea6f7f467a40 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; +import org.apache.pulsar.common.util.Murmur3_32Hash; public interface StickyKeyConsumerSelector { @@ -45,7 +46,21 @@ public interface StickyKeyConsumerSelector { * @param stickyKey sticky key * @return consumer */ - Consumer select(byte[] stickyKey); + default Consumer select(byte[] stickyKey) { + return select(makeStickyKeyHash(stickyKey)); + } + + static int makeStickyKeyHash(byte[] stickyKey) { + return Murmur3_32Hash.getInstance().makeHash(stickyKey); + } + + /** + * Select a consumer by hash. + * + * @param hash hash corresponding to sticky key + * @return consumer + */ + Consumer select(int hash); /** * Get key hash ranges handled by each consumer diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java new file mode 100644 index 0000000000000..be143565c483f --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import com.google.common.collect.ComparisonChain; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair; +import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet; +import org.apache.pulsar.common.util.collections.LongPairSet; + +public class MessageRedeliveryController { + private final LongPairSet messagesToRedeliver; + private final ConcurrentLongLongPairHashMap hashesToBeBlocked; + + public MessageRedeliveryController(boolean allowOutOfOrderDelivery) { + this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); + this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2); + } + + public boolean add(long ledgerId, long entryId) { + return messagesToRedeliver.add(ledgerId, entryId); + } + + public boolean add(long ledgerId, long entryId, long stickyKeyHash) { + if (hashesToBeBlocked != null) { + hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0); + } + return messagesToRedeliver.add(ledgerId, entryId); + } + + public boolean remove(long ledgerId, long entryId) { + if (hashesToBeBlocked != null) { + hashesToBeBlocked.remove(ledgerId, entryId); + } + return messagesToRedeliver.remove(ledgerId, entryId); + } + + public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) { + if (hashesToBeBlocked != null) { + List keysToRemove = new ArrayList<>(); + hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> { + if (ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId) + .result() <= 0) { + keysToRemove.add(new LongPair(ledgerId, entryId)); + } + }); + keysToRemove.forEach(longPair -> hashesToBeBlocked.remove(longPair.first, longPair.second)); + keysToRemove.clear(); + } + return messagesToRedeliver.removeIf((ledgerId, entryId) -> { + return ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId) + .result() <= 0; + }); + } + + public boolean isEmpty() { + return messagesToRedeliver.isEmpty(); + } + + public void clear() { + if (hashesToBeBlocked != null) { + hashesToBeBlocked.clear(); + } + messagesToRedeliver.clear(); + } + + public String toString() { + return messagesToRedeliver.toString(); + } + + public boolean containsStickyKeyHashes(Set stickyKeyHashes) { + final AtomicBoolean isContained = new AtomicBoolean(false); + if (hashesToBeBlocked != null) { + hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> { + if (!isContained.get() && stickyKeyHashes.contains((int) stickyKeyHash)) { + isContained.set(true); + } + }); + } + return isContained.get(); + } + + public Set getMessagesToReplayNow(int maxMessagesToRead) { + if (hashesToBeBlocked != null) { + // allowOutOfOrderDelivery is false + return messagesToRedeliver.items().stream() + .sorted((l1, l2) -> ComparisonChain.start().compare(l1.first, l2.first) + .compare(l1.second, l2.second).result()) + .limit(maxMessagesToRead).map(longPair -> new PositionImpl(longPair.first, longPair.second)) + .collect(Collectors.toCollection(TreeSet::new)); + } else { + // allowOutOfOrderDelivery is true + return messagesToRedeliver.items(maxMessagesToRead, + (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 6b2b4ec4726ad..adc5b2b099434 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -20,10 +20,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; - -import com.google.common.collect.ComparisonChain; import com.google.common.collect.Lists; - import java.util.Collections; import java.util.List; import java.util.Optional; @@ -57,19 +54,17 @@ import org.apache.pulsar.broker.service.RedeliveryTracker; import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled; import org.apache.pulsar.broker.service.SendMessageInfo; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; -import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.DispatchRate; import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicPolicies; import org.apache.pulsar.common.util.Codec; -import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet; -import org.apache.pulsar.common.util.collections.LongPairSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +77,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected volatile Range lastIndividualDeletedRangeFromCursorRecovery; private CompletableFuture closeFuture = null; - LongPairSet messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2); + protected final MessageRedeliveryController redeliveryMessages; protected final RedeliveryTracker redeliveryTracker; private Optional delayedDeliveryTracker = Optional.empty(); @@ -109,12 +104,19 @@ enum ReadType { Normal, Replay } - public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription) { + public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, + Subscription subscription) { + this(topic, cursor, subscription, true); + } + + public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, + boolean allowOutOfOrderDelivery) { super(subscription, topic.getBrokerService().pulsar().getConfiguration()); this.cursor = cursor; this.lastIndividualDeletedRangeFromCursorRecovery = cursor.getLastIndividualDeletedRange(); this.name = topic.getName() + " / " + Codec.decode(cursor.getName()); this.topic = topic; + this.redeliveryMessages = new MessageRedeliveryController(allowOutOfOrderDelivery); this.redeliveryTracker = this.serviceConfig.isSubscriptionRedeliveryTrackerEnabled() ? new InMemoryRedeliveryTracker() : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; @@ -137,7 +139,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce cursor.rewind(); shouldRewindBeforeReadingOrReplaying = false; } - messagesToRedeliver.clear(); + redeliveryMessages.clear(); } if (isConsumersExceededOnSubscription()) { @@ -195,7 +197,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE havePendingRead = false; } - messagesToRedeliver.clear(); + redeliveryMessages.clear(); redeliveryTracker.clear(); if (closeFuture != null) { log.info("[{}] All consumers removed. Subscription is disconnected", name); @@ -206,8 +208,8 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE if (log.isDebugEnabled()) { log.debug("[{}] Consumer are left, reading more entries", name); } - consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - if (addMessageToReplay(ledgerId, entryId)) { + consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { + if (addMessageToReplay(ledgerId, entryId, stickyKeyHash)) { redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId)); } }); @@ -333,9 +335,9 @@ public synchronized void readMoreEntries() { asyncReplayEntriesInOrder(messagesToReplayNow) : asyncReplayEntries(messagesToReplayNow); // clear already acked positions from replay bucket - deletedMessages.forEach(position -> messagesToRedeliver.remove(((PositionImpl) position).getLedgerId(), + deletedMessages.forEach(position -> redeliveryMessages.remove(((PositionImpl) position).getLedgerId(), ((PositionImpl) position).getEntryId())); - // if all the entries are acked-entries and cleared up from messagesToRedeliver, try to read + // if all the entries are acked-entries and cleared up from redeliveryMessages, try to read // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; @@ -516,7 +518,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // remove positions first from replay list first : sendMessages recycles entries if (readType == ReadType.Replay) { entries.subList(start, start + messagesForC).forEach(entry -> { - messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()); + redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); }); } @@ -563,7 +565,8 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { entries.size() - start); } entries.subList(start, entries.size()).forEach(entry -> { - addMessageToReplay(entry.getLedgerId(), entry.getEntryId()); + long stickyKeyHash = getStickyKeyHash(entry); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); }); } @@ -609,11 +612,7 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj havePendingReplayRead = false; if (exception instanceof ManagedLedgerException.InvalidReplayPositionException) { PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); - - messagesToRedeliver.removeIf((ledgerId, entryId) -> { - return ComparisonChain.start().compare(ledgerId, markDeletePosition.getLedgerId()) - .compare(entryId, markDeletePosition.getEntryId()).result() <= 0; - }); + redeliveryMessages.removeAllUpTo(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); } } @@ -679,12 +678,12 @@ public boolean isConsumerAvailable(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { - consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - addMessageToReplay(ledgerId, entryId); + consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { + addMessageToReplay(ledgerId, entryId, stickyKeyHash); }); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, - messagesToRedeliver); + redeliveryMessages); } readMoreEntries(); } @@ -692,6 +691,8 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { positions.forEach(position -> { + // TODO: We want to pass a sticky key hash as a third argument to guarantee the order of the messages + // on Key_Shared subscription, but it's difficult to get the sticky key here if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) { redeliveryTracker.addIfAbsent(position); } @@ -793,29 +794,9 @@ public void initializeDispatchRateLimiterIfNeeded(Optional policies) { } } - @Override - public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) { - if (!topic.isDelayedDeliveryEnabled()) { - // If broker has the feature disabled, always deliver messages immediately - return false; - } - - synchronized (this) { - if (!delayedDeliveryTracker.isPresent()) { - // Initialize the tracker the first time we need to use it - delayedDeliveryTracker = Optional - .of(topic.getBrokerService().getDelayedDeliveryTrackerFactory().newTracker(this)); - } - - delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); - return delayedDeliveryTracker.get().addMessage(ledgerId, entryId, msgMetadata.getDeliverAtTime()); - } - } - protected synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { - if (!messagesToRedeliver.isEmpty()) { - return messagesToRedeliver.items(maxMessagesToRead, - (ledgerId, entryId) -> new PositionImpl(ledgerId, entryId)); + if (!redeliveryMessages.isEmpty()) { + return redeliveryMessages.getMessagesToReplayNow(maxMessagesToRead); } else if (delayedDeliveryTracker.isPresent() && delayedDeliveryTracker.get().hasMessageAvailable()) { delayedDeliveryTracker.get().resetTickTime(topic.getDelayedDeliveryTickTimeMillis()); return delayedDeliveryTracker.get().getScheduledMessages(maxMessagesToRead); @@ -841,20 +822,37 @@ public void cursorIsReset() { } } + protected boolean addMessageToReplay(long ledgerId, long entryId, long stickyKeyHash) { + if (checkIfMessageIsUnacked(ledgerId, entryId)) { + redeliveryMessages.add(ledgerId, entryId, stickyKeyHash); + return true; + } else { + return false; + } + } + protected boolean addMessageToReplay(long ledgerId, long entryId) { - PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); - if (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId() - || (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId())) { - messagesToRedeliver.add(ledgerId, entryId); + if (checkIfMessageIsUnacked(ledgerId, entryId)) { + redeliveryMessages.add(ledgerId, entryId); return true; } else { return false; } } + private boolean checkIfMessageIsUnacked(long ledgerId, long entryId) { + PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); + return (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId() + || (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId())); + } + public PersistentTopic getTopic() { return topic; } + protected int getStickyKeyHash(Entry entry) { + return StickyKeyConsumerSelector.makeStickyKeyHash(peekStickyKey(entry.getDataBuffer())); + } + private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 45921ddce490a..299f54578e6e5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -72,7 +72,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { - super(topic, cursor, subscription); + super(topic, cursor, subscription, ksm.getAllowOutOfOrderDelivery()); this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery(); this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); @@ -125,7 +125,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { // The consumer must be removed from the selector before calling the superclass removeConsumer method. // In the superclass removeConsumer method, the pending acks that the consumer has are added to - // messagesToRedeliver. If the consumer has not been removed from the selector at this point, + // redeliveryMessages. If the consumer has not been removed from the selector at this point, // the broker will try to redeliver the messages to the consumer that has already been closed. // As a result, the messages are not redelivered to any consumer, and the mark-delete position does not move, // eventually causing all consumers to get stuck. @@ -136,7 +136,7 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE if (consumerList.size() == 1) { recentlyJoinedConsumers.clear(); } - if (removeConsumersFromRecentJoinedConsumers() || messagesToRedeliver.size() > 0) { + if (removeConsumersFromRecentJoinedConsumers() || !redeliveryMessages.isEmpty()) { readMoreEntries(); } } @@ -171,10 +171,13 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { final Map> groupedEntries = localGroupedEntries.get(); groupedEntries.clear(); + final Map> consumerStickyKeyHashesMap = new HashMap<>(); for (Entry entry : entries) { - Consumer c = selector.select(peekStickyKey(entry.getDataBuffer())); + int stickyKeyHash = getStickyKeyHash(entry); + Consumer c = selector.select(stickyKeyHash); groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); + consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash); } AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size()); @@ -189,7 +192,8 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { int entriesWithSameKeyCount = entriesWithSameKey.size(); final int availablePermits = consumer == null ? 0 : Math.max(consumer.getAvailablePermits(), 0); int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits); - int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, readType); + int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, + readType, consumerStickyKeyHashesMap.get(consumer)); if (log.isDebugEnabled()) { log.debug("[{}] select consumer {} with messages num {}, read type is {}", name, consumer.consumerName(), messagesForC, readType); @@ -200,7 +204,8 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // so we discard for now and mark them for later redelivery for (int i = messagesForC; i < entriesWithSameKeyCount; i++) { Entry entry = entriesWithSameKey.get(i); - addMessageToReplay(entry.getLedgerId(), entry.getEntryId()); + long stickyKeyHash = getStickyKeyHash(entry); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash); entry.release(); entriesWithSameKey.set(i, null); } @@ -211,7 +216,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { if (readType == ReadType.Replay) { for (int i = 0; i < messagesForC; i++) { Entry entry = entriesWithSameKey.get(i); - messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()); + redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId()); } } @@ -279,12 +284,19 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { } } - private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, int maxMessages, ReadType readType) { + private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List entries, int maxMessages, + ReadType readType, Set stickyKeyHashes) { if (maxMessages == 0) { // the consumer was stuck nextStuckConsumers.add(consumer); return 0; } + if (readType == ReadType.Normal && stickyKeyHashes != null + && redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) { + // If redeliveryMessages contains messages that correspond to the same hash as the messages + // that the dispatcher is trying to send, do not send those messages for order guarantee + return 0; + } if (recentlyJoinedConsumers == null) { return maxMessages; } @@ -362,6 +374,7 @@ private boolean removeConsumersFromRecentJoinedConsumers() { return hasConsumerRemovedFromTheRecentJoinedConsumers; } + @Override protected synchronized Set getMessagesToReplayNow(int maxMessagesToRead) { if (isDispatcherStuckOnReplays) { // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index c7344d68cd012..43645bf4b7c6f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -48,6 +48,7 @@ import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.service.persistent.MessageRedeliveryController; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -80,7 +81,6 @@ import org.apache.pulsar.common.protocol.schema.SchemaData; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.common.stats.Metrics; -import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet; import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; import org.awaitility.Awaitility; import org.testng.Assert; @@ -1571,9 +1571,10 @@ public void testMessageReplay() throws Exception { PersistentSubscription subRef = topicRef.getSubscription(subName); PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef .getDispatcher(); - Field replayMap = PersistentDispatcherMultipleConsumers.class.getDeclaredField("messagesToRedeliver"); - replayMap.setAccessible(true); - ConcurrentLongPairSet messagesToReplay = new ConcurrentLongPairSet(64, 1); + Field redeliveryMessagesField = PersistentDispatcherMultipleConsumers.class + .getDeclaredField("redeliveryMessages"); + redeliveryMessagesField.setAccessible(true); + MessageRedeliveryController redeliveryMessages = new MessageRedeliveryController(true); assertNotNull(subRef); @@ -1594,24 +1595,24 @@ public void testMessageReplay() throws Exception { } if (i < replayIndex) { // (3) accumulate acked messages for replay - messagesToReplay.add(msgId.getLedgerId(), msgId.getEntryId()); + redeliveryMessages.add(msgId.getLedgerId(), msgId.getEntryId()); } } // (4) redelivery : should redeliver only unacked messages Thread.sleep(1000); - replayMap.set(dispatcher, messagesToReplay); + redeliveryMessagesField.set(dispatcher, redeliveryMessages); // (a) redelivery with all acked-message should clear messageReply bucket dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0)); Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { - return messagesToReplay.isEmpty(); + return redeliveryMessages.isEmpty(); }); - assertEquals(messagesToReplay.size(), 0); + assertTrue(redeliveryMessages.isEmpty()); // (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it - messagesToReplay.add(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId()); - replayMap.set(dispatcher, messagesToReplay); + redeliveryMessages.add(firstAckedMsg.getLedgerId(), firstAckedMsg.getEntryId()); + redeliveryMessagesField.set(dispatcher, redeliveryMessages); // send new message final String testMsg = "testMsg"; producer.send(testMsg.getBytes()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java new file mode 100644 index 0000000000000..9a785f6f95fd6 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryControllerTest.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertEqualsNoOrder; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import com.google.common.collect.Sets; +import java.lang.reflect.Field; +import java.util.Set; +import java.util.TreeSet; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap; +import org.apache.pulsar.common.util.collections.LongPairSet; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Test(groups = "broker") +public class MessageRedeliveryControllerTest { + @DataProvider(name = "allowOutOfOrderDelivery") + public Object[][] dataProvider() { + return new Object[][] { { true }, { false } }; + } + + @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000) + public void testAddAndRemove(boolean allowOutOfOrderDelivery) throws Exception { + MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery); + + Field messagesToRedeliverField = MessageRedeliveryController.class.getDeclaredField("messagesToRedeliver"); + messagesToRedeliverField.setAccessible(true); + LongPairSet messagesToRedeliver = (LongPairSet) messagesToRedeliverField.get(controller); + + Field hashesToBeBlockedField = MessageRedeliveryController.class.getDeclaredField("hashesToBeBlocked"); + hashesToBeBlockedField.setAccessible(true); + ConcurrentLongLongPairHashMap hashesToBeBlocked = (ConcurrentLongLongPairHashMap) hashesToBeBlockedField + .get(controller); + + if (allowOutOfOrderDelivery) { + assertNull(hashesToBeBlocked); + } else { + assertNotNull(hashesToBeBlocked); + } + + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + } + + assertTrue(controller.add(1, 1)); + assertTrue(controller.add(1, 2)); + assertFalse(controller.add(1, 1)); + + assertFalse(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 2); + assertTrue(messagesToRedeliver.contains(1, 1)); + assertTrue(messagesToRedeliver.contains(1, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + assertFalse(hashesToBeBlocked.containsKey(1, 1)); + assertFalse(hashesToBeBlocked.containsKey(1, 2)); + } + + assertTrue(controller.remove(1, 1)); + assertTrue(controller.remove(1, 2)); + assertFalse(controller.remove(1, 1)); + + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + assertFalse(messagesToRedeliver.contains(1, 1)); + assertFalse(messagesToRedeliver.contains(1, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + } + + assertTrue(controller.add(2, 1, 100)); + assertTrue(controller.add(2, 2, 101)); + assertTrue(controller.add(2, 3, 101)); + assertFalse(controller.add(2, 1, 100)); + + assertFalse(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 3); + assertTrue(messagesToRedeliver.contains(2, 1)); + assertTrue(messagesToRedeliver.contains(2, 2)); + assertTrue(messagesToRedeliver.contains(2, 3)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 3); + assertEquals(hashesToBeBlocked.get(2, 1).first, 100); + assertEquals(hashesToBeBlocked.get(2, 2).first, 101); + assertEquals(hashesToBeBlocked.get(2, 3).first, 101); + } + + controller.clear(); + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + assertTrue(messagesToRedeliver.isEmpty()); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + assertTrue(hashesToBeBlocked.isEmpty()); + } + + controller.add(2, 2, 201); + controller.add(1, 3, 100); + controller.add(3, 1, 300); + controller.add(2, 1, 200); + controller.add(3, 2, 301); + controller.add(1, 2, 101); + controller.add(1, 1, 100); + + controller.removeAllUpTo(1, 3); + assertEquals(messagesToRedeliver.size(), 4); + assertTrue(messagesToRedeliver.contains(2, 1)); + assertTrue(messagesToRedeliver.contains(2, 2)); + assertTrue(messagesToRedeliver.contains(3, 1)); + assertTrue(messagesToRedeliver.contains(3, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 4); + assertEquals(hashesToBeBlocked.get(2, 1).first, 200); + assertEquals(hashesToBeBlocked.get(2, 2).first, 201); + assertEquals(hashesToBeBlocked.get(3, 1).first, 300); + assertEquals(hashesToBeBlocked.get(3, 2).first, 301); + } + + controller.removeAllUpTo(3, 1); + assertEquals(messagesToRedeliver.size(), 1); + assertTrue(messagesToRedeliver.contains(3, 2)); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 1); + assertEquals(hashesToBeBlocked.get(3, 2).first, 301); + } + + controller.removeAllUpTo(5, 10); + assertTrue(controller.isEmpty()); + assertEquals(messagesToRedeliver.size(), 0); + if (!allowOutOfOrderDelivery) { + assertEquals(hashesToBeBlocked.size(), 0); + } + } + + @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000) + public void testContainsStickyKeyHashes(boolean allowOutOfOrderDelivery) throws Exception { + MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery); + controller.add(1, 1, 100); + controller.add(1, 2, 101); + controller.add(1, 3, 102); + controller.add(2, 2, 103); + controller.add(2, 1, 104); + + if (allowOutOfOrderDelivery) { + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(100))); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(101, 102, 103))); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(104, 105))); + } else { + assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(100))); + assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(101, 102, 103))); + assertTrue(controller.containsStickyKeyHashes(Sets.newHashSet(104, 105))); + } + + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet())); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(99))); + assertFalse(controller.containsStickyKeyHashes(Sets.newHashSet(105, 106))); + } + + @Test(dataProvider = "allowOutOfOrderDelivery", timeOut = 10000) + public void testGetMessagesToReplayNow(boolean allowOutOfOrderDelivery) throws Exception { + MessageRedeliveryController controller = new MessageRedeliveryController(allowOutOfOrderDelivery); + controller.add(2, 2); + controller.add(1, 3); + controller.add(3, 1); + controller.add(2, 1); + controller.add(3, 2); + controller.add(1, 2); + controller.add(1, 1); + + if (allowOutOfOrderDelivery) { + // The entries are sorted by ledger ID but not by entry ID + PositionImpl[] actual1 = controller.getMessagesToReplayNow(3).toArray(new PositionImpl[3]); + PositionImpl[] expected1 = { PositionImpl.get(1, 1), PositionImpl.get(1, 2), PositionImpl.get(1, 3) }; + assertEqualsNoOrder(actual1, expected1); + } else { + // The entries are completely sorted + Set actual2 = controller.getMessagesToReplayNow(6); + Set expected2 = new TreeSet<>(); + expected2.add(PositionImpl.get(1, 1)); + expected2.add(PositionImpl.get(1, 2)); + expected2.add(PositionImpl.get(1, 3)); + expected2.add(PositionImpl.get(2, 1)); + expected2.add(PositionImpl.get(2, 2)); + expected2.add(PositionImpl.get(3, 1)); + assertEquals(actual2, expected2); + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java index a902ac2bb4e27..041d740c03f1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java @@ -22,13 +22,20 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelPromise; import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; -import org.apache.pulsar.broker.service.*; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta; +import org.apache.pulsar.broker.service.BrokerService; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.EntryBatchIndexesAcks; +import org.apache.pulsar.broker.service.EntryBatchSizes; +import org.apache.pulsar.broker.service.RedeliveryTracker; +import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Markers; import org.mockito.ArgumentCaptor; @@ -43,13 +50,32 @@ import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.anyList; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.anySet; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; @@ -279,6 +305,123 @@ public void testSkipRedeliverTemporally() { ); } + @Test(timeOut = 30000) + public void testMessageRedelivery() throws Exception { + final Queue actualEntriesToConsumer1 = new ConcurrentLinkedQueue<>(); + final Queue actualEntriesToConsumer2 = new ConcurrentLinkedQueue<>(); + + final Queue expectedEntriesToConsumer1 = new ConcurrentLinkedQueue<>(); + expectedEntriesToConsumer1.add(PositionImpl.get(1, 1)); + final Queue expectedEntriesToConsumer2 = new ConcurrentLinkedQueue<>(); + expectedEntriesToConsumer2.add(PositionImpl.get(1, 2)); + expectedEntriesToConsumer2.add(PositionImpl.get(1, 3)); + + final AtomicInteger remainingEntriesNum = new AtomicInteger( + expectedEntriesToConsumer1.size() + expectedEntriesToConsumer2.size()); + + // Messages with key1 are routed to consumer1 and messages with key2 are routed to consumer2 + final List allEntries = new ArrayList<>(); + allEntries.add(EntryImpl.create(1, 1, createMessage("message1", 1, "key2"))); + allEntries.add(EntryImpl.create(1, 2, createMessage("message2", 2, "key1"))); + allEntries.add(EntryImpl.create(1, 3, createMessage("message3", 3, "key1"))); + allEntries.forEach(entry -> ((EntryImpl) entry).retain()); + + final List redeliverEntries = new ArrayList<>(); + redeliverEntries.add(allEntries.get(0)); // message1 + final List readEntries = new ArrayList<>(); + readEntries.add(allEntries.get(2)); // message3 + + final Consumer consumer1 = mock(Consumer.class); + doReturn("consumer1").when(consumer1).consumerName(); + // Change availablePermits of consumer1 to 0 and then back to normal + when(consumer1.getAvailablePermits()).thenReturn(0).thenReturn(10); + doReturn(true).when(consumer1).isWritable(); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + List entries = (List) invocationOnMock.getArgument(0); + for (Entry entry : entries) { + remainingEntriesNum.decrementAndGet(); + actualEntriesToConsumer1.add(entry.getPosition()); + } + return channelMock; + }).when(consumer1).sendMessages(anyList(), any(EntryBatchSizes.class), any(EntryBatchIndexesAcks.class), + anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + + final Consumer consumer2 = mock(Consumer.class); + doReturn("consumer2").when(consumer2).consumerName(); + when(consumer2.getAvailablePermits()).thenReturn(10); + doReturn(true).when(consumer2).isWritable(); + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + List entries = (List) invocationOnMock.getArgument(0); + for (Entry entry : entries) { + remainingEntriesNum.decrementAndGet(); + actualEntriesToConsumer2.add(entry.getPosition()); + } + return channelMock; + }).when(consumer2).sendMessages(anyList(), any(EntryBatchSizes.class), any(EntryBatchIndexesAcks.class), + anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class)); + + persistentDispatcher.addConsumer(consumer1); + persistentDispatcher.addConsumer(consumer2); + + final Field totalAvailablePermitsField = PersistentDispatcherMultipleConsumers.class + .getDeclaredField("totalAvailablePermits"); + totalAvailablePermitsField.setAccessible(true); + totalAvailablePermitsField.set(persistentDispatcher, 1000); + + final Field redeliveryMessagesField = PersistentDispatcherMultipleConsumers.class + .getDeclaredField("redeliveryMessages"); + redeliveryMessagesField.setAccessible(true); + MessageRedeliveryController redeliveryMessages = (MessageRedeliveryController) redeliveryMessagesField + .get(persistentDispatcher); + redeliveryMessages.add(allEntries.get(0).getLedgerId(), allEntries.get(0).getEntryId(), + getStickyKeyHash(allEntries.get(0))); // message1 + redeliveryMessages.add(allEntries.get(1).getLedgerId(), allEntries.get(1).getEntryId(), + getStickyKeyHash(allEntries.get(1))); // message2 + + // Mock Cursor#asyncReplayEntries + doAnswer(invocationOnMock -> { + @SuppressWarnings("unchecked") + Set positions = (Set) invocationOnMock.getArgument(0); + List entries = allEntries.stream().filter(entry -> positions.contains(entry.getPosition())) + .collect(Collectors.toList()); + if (!entries.isEmpty()) { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(1)) + .readEntriesComplete(entries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay); + } + return Collections.emptySet(); + }).when(cursorMock).asyncReplayEntries(anySet(), any(PersistentStickyKeyDispatcherMultipleConsumers.class), + eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay), anyBoolean()); + + // Mock Cursor#asyncReadEntriesOrWait + doAnswer(invocationOnMock -> { + ((PersistentStickyKeyDispatcherMultipleConsumers) invocationOnMock.getArgument(2)) + .readEntriesComplete(readEntries, PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal); + return null; + }).when(cursorMock).asyncReadEntriesOrWait(anyInt(), anyLong(), + any(PersistentStickyKeyDispatcherMultipleConsumers.class), + eq(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Normal)); + + // (1) Run sendMessagesToConsumers + // (2) Attempts to send message1 to consumer1 but skipped because availablePermits is 0 + // (3) Change availablePermits of consumer1 to 10 + // (4) Run readMoreEntries internally + // (5) Run sendMessagesToConsumers internally + // (6) Attempts to send message3 to consumer2 but skipped because redeliveryMessages contains message2 + persistentDispatcher.sendMessagesToConsumers(PersistentStickyKeyDispatcherMultipleConsumers.ReadType.Replay, + redeliverEntries); + while (remainingEntriesNum.get() > 0) { + // (7) Run readMoreEntries and resend message1 to consumer1 and message2-3 to consumer2 + persistentDispatcher.readMoreEntries(); + } + + assertEquals(actualEntriesToConsumer1, expectedEntriesToConsumer1); + assertEquals(actualEntriesToConsumer2, expectedEntriesToConsumer2); + + allEntries.forEach(entry -> entry.release()); + } + private ByteBuf createMessage(String message, int sequenceId) { return createMessage(message, sequenceId, "testKey"); } @@ -292,4 +435,9 @@ private ByteBuf createMessage(String message, int sequenceId, String key) { messageMetadata.setPublishTime(System.currentTimeMillis()); return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata.build(), Unpooled.copiedBuffer(message.getBytes(UTF_8))); } + + private int getStickyKeyHash(Entry entry) { + byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscriptionName); + return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 1a0dac5ce57de..1cd03854ca4ac 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -30,6 +30,7 @@ import io.netty.buffer.Unpooled; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.Map; @@ -1529,6 +1530,23 @@ public static ByteBuf newGetOrCreateSchemaResponseError(long requestId, ServerEr return res; } + private static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8); + public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) { + try { + int readerIdx = metadataAndPayload.readerIndex(); + MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload); + metadataAndPayload.readerIndex(readerIdx); + if (metadata.hasOrderingKey()) { + return metadata.getOrderingKey().toByteArray(); + } else if (metadata.hasPartitionKey()) { + return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8); + } + } catch (Throwable t) { + log.error("[{}] [{}] Failed to peek sticky key from the message metadata", topic, subscription, t); + } + return Commands.NONE_KEY; + } + // ---- transaction related ---- public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds) {