Skip to content

Commit

Permalink
[broker] Fix issue that message ordering could be broken when redeliv…
Browse files Browse the repository at this point in the history
…ering messages on Key_Shared subscription (#10762)

Messages with the same key can be out of order if message redelivery occurs on a Key_Shared subscription.

1. Suppose `PersistentDispatcherMultipleConsumers#messagesToRedeliver` contains message-1 and message-2. Message-1 will be delivered to consumer-a and message-2 will be delivered to consumer-b.
2. The dispatcher tried to send message-1 to consumer-a, but the consumer was too slow to send it.
3. Consumer-a is added to `stuckConsumers`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L263-L266
4. The next time `readMoreEntries()` is run, `getMessagesToReplayNow()` will return an empty Set because `isDispatcherStuckOnReplays` is true.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L368-L374
5. The dispatcher reads newer messages instead of the messages contained in `messagesToRedeliver`.
https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L233-L267
6. A new message (message-3) is delivered to consumer-b.
7. Message-2 contained in messagesToRedeliver is delivered to consumer-b.
8. As a result, the order of message-2 and message-3 is reversed.

When adding a message to be redeliver to `messagesToRedeliver`, save the hash of the key that the message has. If the dispatcher attempts to send newer messages to the consumer that have a key corresponding to any one of the saved hash values, they will be added to `messagesToRedeliver` instead of being sent. This prevents messages with the same key from being out of order.

(cherry picked from commit 5aee599)
  • Loading branch information
Masahiro Sakamoto authored and codelipenghui committed Jul 19, 2021
1 parent 7537376 commit 0d383ba
Show file tree
Hide file tree
Showing 12 changed files with 614 additions and 89 deletions.
Expand Up @@ -105,9 +105,7 @@ public void removeConsumer(Consumer consumer) {
}

@Override
public Consumer select(byte[] stickyKey) {
int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);

public Consumer select(int hash) {
rwLock.readLock().lock();
try {
if (hashRing.isEmpty()) {
Expand Down
Expand Up @@ -54,6 +54,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -236,7 +237,8 @@ public Future<Void> sendMessages(final List<Entry> entries, EntryBatchSizes batc
Entry entry = entries.get(i);
if (entry != null) {
int batchSize = batchSizes.getBatchSize(i);
pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, 0);
int stickyKeyHash = getStickyKeyHash(entry);
pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, stickyKeyHash);
if (log.isDebugEnabled()){
log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in"
+ " broker.service.Consumer for consumerId: {}",
Expand Down Expand Up @@ -729,7 +731,7 @@ public void redeliverUnacknowledgedMessages() {
if (pendingAcks != null) {
List<PositionImpl> pendingPositions = new ArrayList<>((int) pendingAcks.size());
MutableInt totalRedeliveryMessages = new MutableInt(0);
pendingAcks.forEach((ledgerId, entryId, batchSize, none) -> {
pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
totalRedeliveryMessages.add((int) batchSize);
pendingPositions.add(new PositionImpl(ledgerId, entryId));
});
Expand All @@ -752,10 +754,11 @@ public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) {
List<PositionImpl> pendingPositions = Lists.newArrayList();
for (MessageIdData msg : messageIds) {
PositionImpl position = PositionImpl.get(msg.getLedgerId(), msg.getEntryId());
LongPair batchSize = pendingAcks.get(position.getLedgerId(), position.getEntryId());
if (batchSize != null) {
LongPair longPair = pendingAcks.get(position.getLedgerId(), position.getEntryId());
if (longPair != null) {
long batchSize = longPair.first;
pendingAcks.remove(position.getLedgerId(), position.getEntryId());
totalRedeliveryMessages += batchSize.first;
totalRedeliveryMessages += batchSize;
pendingPositions.add(position);
}
}
Expand Down Expand Up @@ -810,5 +813,10 @@ public TransportCnx cnx() {
return cnx;
}

private int getStickyKeyHash(Entry entry) {
byte[] stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscription.getName());
return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
}

private static final Logger log = LoggerFactory.getLogger(Consumer.class);
}
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;

import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerAssignException;
import org.apache.pulsar.common.util.Murmur3_32Hash;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -104,8 +103,7 @@ public synchronized void removeConsumer(Consumer consumer) {
}

@Override
public Consumer select(byte[] stickyKey) {
int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);
public Consumer select(int hash) {
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
return rangeMap.ceilingEntry(slot).getValue();
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.service;

import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.Murmur3_32Hash;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -65,11 +64,6 @@ public void removeConsumer(Consumer consumer) {
rangeMap.entrySet().removeIf(entry -> entry.getValue().equals(consumer));
}

@Override
public Consumer select(byte[] stickyKey) {
return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
}

@Override
public Map<String, List<String>> getConsumerKeyHashRanges() {
Map<String, List<String>> result = new HashMap<>();
Expand All @@ -88,7 +82,8 @@ public Map<String, List<String>> getConsumerKeyHashRanges() {
return result;
}

Consumer select(int hash) {
@Override
public Consumer select(int hash) {
if (rangeMap.size() > 0) {
int slot = hash % rangeSize;
Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot);
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.util.List;
import java.util.Map;
import org.apache.pulsar.common.util.Murmur3_32Hash;

public interface StickyKeyConsumerSelector {

Expand All @@ -45,7 +46,21 @@ public interface StickyKeyConsumerSelector {
* @param stickyKey sticky key
* @return consumer
*/
Consumer select(byte[] stickyKey);
default Consumer select(byte[] stickyKey) {
return select(makeStickyKeyHash(stickyKey));
}

static int makeStickyKeyHash(byte[] stickyKey) {
return Murmur3_32Hash.getInstance().makeHash(stickyKey);
}

/**
* Select a consumer by hash.
*
* @param hash hash corresponding to sticky key
* @return consumer
*/
Consumer select(int hash);

/**
* Get key hash ranges handled by each consumer
Expand Down
@@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import com.google.common.collect.ComparisonChain;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
import org.apache.pulsar.common.util.collections.LongPairSet;

public class MessageRedeliveryController {
private final LongPairSet messagesToRedeliver;
private final ConcurrentLongLongPairHashMap hashesToBeBlocked;

public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
}

public boolean add(long ledgerId, long entryId) {
return messagesToRedeliver.add(ledgerId, entryId);
}

public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
if (hashesToBeBlocked != null) {
hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
}
return messagesToRedeliver.add(ledgerId, entryId);
}

public boolean remove(long ledgerId, long entryId) {
if (hashesToBeBlocked != null) {
hashesToBeBlocked.remove(ledgerId, entryId);
}
return messagesToRedeliver.remove(ledgerId, entryId);
}

public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
if (hashesToBeBlocked != null) {
List<LongPair> keysToRemove = new ArrayList<>();
hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> {
if (ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId)
.result() <= 0) {
keysToRemove.add(new LongPair(ledgerId, entryId));
}
});
keysToRemove.forEach(longPair -> hashesToBeBlocked.remove(longPair.first, longPair.second));
keysToRemove.clear();
}
return messagesToRedeliver.removeIf((ledgerId, entryId) -> {
return ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId)
.result() <= 0;
});
}

public boolean isEmpty() {
return messagesToRedeliver.isEmpty();
}

public void clear() {
if (hashesToBeBlocked != null) {
hashesToBeBlocked.clear();
}
messagesToRedeliver.clear();
}

public String toString() {
return messagesToRedeliver.toString();
}

public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
final AtomicBoolean isContained = new AtomicBoolean(false);
if (hashesToBeBlocked != null) {
hashesToBeBlocked.forEach((ledgerId, entryId, stickyKeyHash, none) -> {
if (!isContained.get() && stickyKeyHashes.contains((int) stickyKeyHash)) {
isContained.set(true);
}
});
}
return isContained.get();
}

public Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (hashesToBeBlocked != null) {
// allowOutOfOrderDelivery is false
return messagesToRedeliver.items().stream()
.sorted((l1, l2) -> ComparisonChain.start().compare(l1.first, l2.first)
.compare(l1.second, l2.second).result())
.limit(maxMessagesToRead).map(longPair -> new PositionImpl(longPair.first, longPair.second))
.collect(Collectors.toCollection(TreeSet::new));
} else {
// allowOutOfOrderDelivery is true
return messagesToRedeliver.items(maxMessagesToRead,
(ledgerId, entryId) -> new PositionImpl(ledgerId, entryId));
}
}
}

0 comments on commit 0d383ba

Please sign in to comment.