From 8928c3496a61c588b50461d6adaab089dd421619 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Sun, 13 Feb 2022 03:05:52 -0800 Subject: [PATCH] If mark-delete operation fails, mark the cursor as "dirty" (#14256) --- .../mledger/impl/ManagedCursorImpl.java | 1 + .../mledger/impl/ManagedCursorTest.java | 51 +++++++++++++++++++ 2 files changed, 52 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 70f0efe3f06b5..242315a6b981f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1833,6 +1833,7 @@ public void operationComplete() { @Override public void operationFailed(ManagedLedgerException exception) { + isDirty = true; log.warn("[{}] Failed to mark delete position for cursor={} position={}", ledger.getName(), ManagedCursorImpl.this, mdEntry.newPosition); if (log.isDebugEnabled()) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 9a4d1177530bb..3281833befb3e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3507,6 +3507,57 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { }); } + + + @Test + public void testFlushCursorAfterError() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setThrottleMarkDelete(1.0); + + ManagedLedgerFactoryConfig factoryConfig = new ManagedLedgerFactoryConfig(); + factoryConfig.setCursorPositionFlushSeconds(1); + + @Cleanup("shutdown") + ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConfig); + ManagedLedger ledger1 = factory1.open("testFlushCursorAfterInactivity", config); + ManagedCursor c1 = ledger1.openCursor("c"); + List positions = new ArrayList<>(); + + for (int i = 0; i < 20; i++) { + positions.add(ledger1.addEntry(new byte[1024])); + } + + // Simulate BK write error + bkc.failNow(BKException.Code.NotEnoughBookiesException); + metadataStore.setAlwaysFail(new MetadataStoreException.BadVersionException("")); + + try { + c1.markDelete(positions.get(positions.size() - 1)); + fail("should have failed"); + } catch (ManagedLedgerException e) { + // Expected + } + + metadataStore.unsetAlwaysFail(); + + // In memory position is updated + assertEquals(c1.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + + Awaitility.await() + // Give chance to the flush to be automatically triggered. + // NOTE: this can't be set too low, or it causes issues with ZK thread pool rejecting + .pollDelay(Duration.ofMillis(2000)) + .untilAsserted(() -> { + // Abruptly re-open the managed ledger without graceful close + @Cleanup("shutdown") + ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ManagedLedger ledger2 = factory2.open("testFlushCursorAfterInactivity", config); + ManagedCursor c2 = ledger2.openCursor("c"); + + assertEquals(c2.getMarkDeletedPosition(), positions.get(positions.size() - 1)); + }); + } + @Test public void testCursorCheckReadPositionChanged() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());