From e16edde27507707a8f77563b52b42e9e644097cc Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sun, 13 Feb 2022 19:01:13 -0800 Subject: [PATCH] Clean up individually deleted messages before the mark-delete position --- .../mledger/impl/ManagedCursorImpl.java | 17 +++++++++++- .../mledger/impl/ManagedCursorTest.java | 26 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 242315a6b981f..5d3a3313dca8c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1579,7 +1579,9 @@ void initializeCursorPosition(Pair lastPositionCounter) { */ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) { if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) { - throw new IllegalArgumentException("Mark deleting an already mark-deleted position"); + throw new IllegalArgumentException( + "Mark deleting an already mark-deleted position. Current mark-delete: " + markDeletePosition + + " -- attempted mark delete: " + newMarkDeletePosition); } PositionImpl oldMarkDeletePosition = markDeletePosition; @@ -2003,6 +2005,19 @@ public void asyncDelete(Iterable positions, AsyncCallbacks.DeleteCallb // mark-delete to the upper bound of the first range segment Range range = individualDeletedMessages.firstRange(); + // If the upper bound is before the mark-delete position, we need to move ahead as these + // individualDeletedMessages are now irrelevant + if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) { + individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(), + markDeletePosition.getEntryId()); + range = individualDeletedMessages.firstRange(); + } + + if (range == null) { + // The set was completely cleaned up now + return; + } + // If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger .getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 3281833befb3e..08056bffa777a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.common.util.collections.LongPairRangeSet; import org.apache.pulsar.metadata.api.extended.SessionEvent; import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -3558,6 +3559,31 @@ public void testFlushCursorAfterError() throws Exception { }); } + @Test + public void testConsistencyOfIndividualMessages() throws Exception { + ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages"); + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger1.openCursor("c"); + + PositionImpl p1 = (PositionImpl) ledger1.addEntry(new byte[1024]); + c1.markDelete(p1); + + // Artificially add a position that is before the current mark-delete position + LongPairRangeSet idm = c1.getIndividuallyDeletedMessagesSet(); + idm.addOpenClosed(p1.getLedgerId() - 1, 0, p1.getLedgerId() - 1, 10); + + List positions = new ArrayList<>(); + for (int i = 0; i < 20; i++) { + positions.add(ledger1.addEntry(new byte[1024])); + } + + for (int i = 0; i < 20; i++) { + c1.delete(positions.get(i)); + } + + assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0); + assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() -1)); + } + @Test public void testCursorCheckReadPositionChanged() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());