diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 7edc4a857b02c..f7eed46091ec9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1804,6 +1804,7 @@ public void operationComplete() { @Override public void operationFailed(ManagedLedgerException exception) { + isDirty = true; log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(), ManagedCursorImpl.this, mdEntry.newPosition); if (log.isDebugEnabled()) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index c14681ac92568..fe21eedab9412 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3495,6 +3495,57 @@ public void testConsistencyOfIndividualMessages() throws Exception { assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() -1)); } + + + @Test + public void testFlushCursorAfterError() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setThrottleMarkDelete(1.0); + + ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig(); + factoryConfig.setCursorPositionFlushSeconds(1); + + @Cleanup("shutdown") + ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig); + ManagedLedger ledger1 = factory1.open("testFlushCursorAfterInactivity", config); + ManagedCursor c1 = ledger1.openCursor("c"); + List positions = new ArrayList<>(); + + for (int i = 0; i < 20; i++) { + positions.add(ledger1.addEntry(new byte[1024])); + } + + // Simulate BK write error + bkc.failNow(BKException.Code.NotEnoughBookiesException); + metadataStore.setAlwaysFail(new MetadataStoreException.BadVersionException("")); + + try { + c1.markDelete(positions.get(positions.size() - 1)); + fail("should have failed"); + } catch (ManagedLedgerException e) { + // Expected + } + + metadataStore.unsetAlwaysFail(); + + // In memory position is updated + assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + + Awaitility.await() + // Give chance to the flush to be automatically triggered. + // NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting + .pollDelay(Duration.ofMillis(2000)) + .untilAsserted(() -> { + // Abruptly re-open the managed ledger without graceful close + @Cleanup("shutdown") + ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); + ManagedCursor c2 = ledger2.openCursor("c"); + + assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + }); + } + @Test public void testCursorCheckReadPositionChanged() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());