diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 585d008299900..a2687a149b31e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -236,8 +236,4 @@ public void resetCloseFuture() { protected byte[] peekStickyKey(ByteBuf metadataAndPayload) { return Commands.peekStickyKey(metadataAndPayload, subscription.getTopicName(), subscription.getName()); } - - protected void addMessageToReplay(long ledgerId, long entryId) { - // No-op - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 67729e35edfdc..b6479a33d8db7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -182,8 +182,9 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE log.debug("[{}] Consumer are left, reading more entries", name); } consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - messagesToRedeliver.add(ledgerId, entryId); - redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId)); + if (addMessageToReplay(ledgerId, entryId)) { + redeliveryTracker.addIfAbsent(PositionImpl.get(ledgerId, entryId)); + } }); totalAvailablePermits -= consumer.getAvailablePermits(); if (log.isDebugEnabled()) { @@ -248,7 +249,9 @@ public synchronized void readMoreEntries() { // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; - readMoreEntries(); + // We should not call readMoreEntries() recursively in the same thread + // as there is a risk of StackOverflowError + topic.getBrokerService().executor().execute(() -> readMoreEntries()); } } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, @@ -574,7 +577,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { entries.size() - start); } entries.subList(start, entries.size()).forEach(entry -> { - messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId()); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId()); entry.release(); }); } @@ -695,7 +698,7 @@ public boolean isConsumerAvailable(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> { - messagesToRedeliver.add(ledgerId, entryId); + addMessageToReplay(ledgerId, entryId); }); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, @@ -707,8 +710,9 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) { @Override public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List positions) { positions.forEach(position -> { - messagesToRedeliver.add(position.getLedgerId(), position.getEntryId()); - redeliveryTracker.addIfAbsent(position); + if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) { + redeliveryTracker.addIfAbsent(position); + } }); if (log.isDebugEnabled()) { log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer, positions); @@ -853,9 +857,15 @@ public void cursorIsReset() { } } - @Override - public void addMessageToReplay(long ledgerId, long entryId) { - this.messagesToRedeliver.add(ledgerId, entryId); + protected boolean addMessageToReplay(long ledgerId, long entryId) { + Position markDeletePosition = cursor.getMarkDeletedPosition(); + if (markDeletePosition == null || ledgerId > markDeletePosition.getLedgerId() + || (ledgerId == markDeletePosition.getLedgerId() && entryId > markDeletePosition.getEntryId())) { + messagesToRedeliver.add(ledgerId, entryId); + return true; + } else { + return false; + } } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index d27df30285c28..5145c3b51b7ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -194,7 +194,7 @@ protected void sendMessagesToConsumers(ReadType readType, List entries) { // so we discard for now and mark them for later redelivery for (int i = messagesForC; i < entriesWithSameKeyCount; i++) { Entry entry = entriesWithSameKey.get(i); - messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId()); + addMessageToReplay(entry.getLedgerId(), entry.getEntryId()); entry.release(); entriesWithSameKey.set(i, null); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java index a080e623a93cb..666bb9813cae8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherMultipleConsumers.java @@ -165,7 +165,9 @@ public synchronized void readMoreEntries() { // next entries as readCompletedEntries-callback was never called if ((messagesToReplayNow.size() - deletedMessages.size()) == 0) { havePendingReplayRead = false; - readMoreEntries(); + // We should not call readMoreEntries() recursively in the same thread + // as there is a risk of StackOverflowError + topic.getBrokerService().executor().execute(() -> readMoreEntries()); } } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 09295dd11a309..283cd98f30aae 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -1700,6 +1700,9 @@ public void testMessageReplay() throws Exception { replayMap.set(dispatcher, messagesToReplay); // (a) redelivery with all acked-message should clear messageReply bucket dispatcher.redeliverUnacknowledgedMessages(dispatcher.getConsumers().get(0)); + Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> { + return messagesToReplay.isEmpty(); + }); assertEquals(messagesToReplay.size(), 0); // (b) fill messageReplyBucket with already acked entry again: and try to publish new msg and read it