Skip to content

Commit

Permalink
Do not use FastThreadLocal
Browse files Browse the repository at this point in the history
  • Loading branch information
Masahiro Sakamoto committed Jun 29, 2021
1 parent 401f4ee commit 75062d6
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 11 deletions.
Expand Up @@ -123,7 +123,7 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
// The consumer must be removed from the selector before calling the superclass removeConsumer method.
// In the superclass removeConsumer method, the pending acks that the consumer has are added to
// messagesToRedeliver. If the consumer has not been removed from the selector at this point,
// redeliveryMessages. If the consumer has not been removed from the selector at this point,
// the broker will try to redeliver the messages to the consumer that has already been closed.
// As a result, the messages are not redelivered to any consumer, and the mark-delete position does not move,
// eventually causing all consumers to get stuck.
Expand All @@ -148,14 +148,6 @@ protected Map<Consumer, List<Entry>> initialValue() throws Exception {
}
};

private static final FastThreadLocal<Map<Consumer, Set<Integer>>> localConsumerStickyKeyHashesMap =
new FastThreadLocal<Map<Consumer, Set<Integer>>>() {
@Override
protected Map<Consumer, Set<Integer>> initialValue() throws Exception {
return new HashMap<>();
}
};

@Override
protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
Expand All @@ -178,8 +170,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
groupedEntries.clear();
final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = localConsumerStickyKeyHashesMap.get();
consumerStickyKeyHashesMap.clear();
final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new HashMap<>();

for (Entry entry : entries) {
int stickyKeyHash = getStickyKeyHash(entry);
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -62,9 +63,11 @@
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyBoolean;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.anySet;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.argThat;
import static org.mockito.Mockito.eq;
Expand Down

0 comments on commit 75062d6

Please sign in to comment.