Skip to content

Commit

Permalink
[ML] Fix race condition in updating lastMarkDeleteEntry field (apache…
Browse files Browse the repository at this point in the history
…#15031)

- missed updates can lead to the subscription and consuming getting stuck

(cherry picked from commit ad2f397)
(cherry picked from commit f657bf8)
  • Loading branch information
lhotari authored and eolivelli committed Apr 6, 2022
1 parent cfe5790 commit 397c910
Showing 1 changed file with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1693,7 +1693,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, properties, null, null);
updateLastMarkDeleteEntryToLatest(newPosition, properties);
callback.markDeleteComplete(ctx);
return;
}
Expand Down Expand Up @@ -1747,7 +1747,14 @@ void internalMarkDelete(final MarkDeleteEntry mdEntry) {
// ledger is postponed to when the counter goes to 0.
PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.incrementAndGet(this);

lastMarkDeleteEntry = mdEntry;
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
if (last != null && last.newPosition.compareTo(mdEntry.newPosition) > 0) {
// keep the current value since it's later then the mdEntry.newPosition
return last;
} else {
return mdEntry;
}
});

persistPositionToLedger(cursorLedger, mdEntry, new VoidCallback() {
@Override
Expand Down Expand Up @@ -2009,9 +2016,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
isDirty = true;
PositionImpl finalNewMarkDeletePosition = newMarkDeletePosition;
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this,
last -> new MarkDeleteEntry(finalNewMarkDeletePosition, last.properties, null, null));
updateLastMarkDeleteEntryToLatest(newMarkDeletePosition, null);
callback.deleteComplete(ctx);
return;
}
Expand Down Expand Up @@ -2043,6 +2048,22 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}
}

// update lastMarkDeleteEntry field if newPosition is later than the current lastMarkDeleteEntry.newPosition
private void updateLastMarkDeleteEntryToLatest(final PositionImpl newPosition,
final Map<String, Long> properties) {
LAST_MARK_DELETE_ENTRY_UPDATER.updateAndGet(this, last -> {
if (last != null && last.newPosition.compareTo(newPosition) > 0) {
// keep current value, don't update
return last;
} else {
// use given properties or when missing, use the properties from the previous field value
Map<String, Long> propertiesToUse =
properties != null ? properties : (last != null ? last.properties : Collections.emptyMap());
return new MarkDeleteEntry(newPosition, propertiesToUse, null, null);
}
});
}

/**
* Given a list of entries, filter out the entries that have already been individually deleted.
*
Expand Down

0 comments on commit 397c910

Please sign in to comment.