Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up individually deleted messages before the mark-delete position #14261

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1579,7 +1579,9 @@ void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
*/
PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
throw new IllegalArgumentException(
"Mark deleting an already mark-deleted position. Current mark-delete: " + markDeletePosition
+ " -- attempted mark delete: " + newMarkDeletePosition);
}

PositionImpl oldMarkDeletePosition = markDeletePosition;
Expand Down Expand Up @@ -2003,6 +2005,19 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
// mark-delete to the upper bound of the first range segment
Range<PositionImpl> range = individualDeletedMessages.firstRange();

// If the upper bound is before the mark-delete position, we need to move ahead as these
// individualDeletedMessages are now irrelevant
if (range.upperEndpoint().compareTo(markDeletePosition) <= 0) {
individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(),
markDeletePosition.getEntryId());
range = individualDeletedMessages.firstRange();
}

if (range == null) {
// The set was completely cleaned up now
return;
}

// If the lowerBound is ahead of MarkDelete, verify if there are any entries in-between
if (range.lowerEndpoint().compareTo(markDeletePosition) <= 0 || ledger
.getNumberOfEntries(Range.openClosed(markDeletePosition, range.lowerEndpoint())) <= 0) {
Expand Down
Expand Up @@ -88,6 +88,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
Expand Down Expand Up @@ -3558,6 +3559,31 @@ public void testFlushCursorAfterError() throws Exception {
});
}

@Test
public void testConsistencyOfIndividualMessages() throws Exception {
ManagedLedger ledger1 = factory.open("testConsistencyOfIndividualMessages");
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger1.openCursor("c");

PositionImpl p1 = (PositionImpl) ledger1.addEntry(new byte[1024]);
c1.markDelete(p1);

// Artificially add a position that is before the current mark-delete position
LongPairRangeSet<PositionImpl> idm = c1.getIndividuallyDeletedMessagesSet();
idm.addOpenClosed(p1.getLedgerId() - 1, 0, p1.getLedgerId() - 1, 10);

List<Position> positions = new ArrayList<>();
for (int i = 0; i < 20; i++) {
positions.add(ledger1.addEntry(new byte[1024]));
}

for (int i = 0; i < 20; i++) {
c1.delete(positions.get(i));
}

assertEquals(c1.getTotalNonContiguousDeletedMessagesRange(), 0);
assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() -1));
}

@Test
public void testCursorCheckReadPositionChanged() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
Expand Down