From cb66036bee0335fd83bd873e9e7df1c69148724f Mon Sep 17 00:00:00 2001 From: ran Date: Sat, 3 Jul 2021 01:40:37 +0800 Subject: [PATCH] [Ledger] Fix ledger rollover scheduled task (#11116) ### Motivation Currently, the ledger rollover scheduled task will execute before reach the ledger maximum rollover time, this will cause the ledger doesn't roll over in time. ### Modifications Only make the ledger rollover scheduled task after the ledger created successfully. If the scheduled task was executed when there is no entry in the current ledger, the scheduled task will not be re-executed, and if there is new entry is added the ledger will rollover. ### Verifying this change Add a unit test to verify the ledger could be rolled over in time. --- .../mledger/impl/ManagedLedgerImpl.java | 31 ++++++++++--------- .../mledger/impl/ManagedLedgerTest.java | 17 ++++++++++ .../CurrentLedgerRolloverIfFullTest.java | 2 +- 3 files changed, 35 insertions(+), 15 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 049b1c7e64792e..e3148b9986f086 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -401,8 +401,6 @@ public void operationFailed(MetaStoreException e) { }); scheduleTimeoutTask(); - - scheduleRollOverLedgerTask(); } private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { @@ -467,7 +465,7 @@ public void operationFailed(MetaStoreException e) { log.info("[{}] Created ledger {}", name, lh.getId()); STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); currentLedger = lh; lastConfirmedEntry = new PositionImpl(lh.getId(), -1); @@ -1488,7 +1486,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback) { public synchronized void updateLedgersIdsComplete(Stat stat) { STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); if (log.isDebugEnabled()) { log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); @@ -3361,7 +3359,7 @@ private boolean currentLedgerIsFull() { || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte)); long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp; - boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs; + boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs(); if (spaceQuotaReached || maxLedgerTimeReached) { if (config.getMinimumRolloverTimeMs() > 0) { @@ -3663,15 +3661,6 @@ private void scheduleTimeoutTask() { } } - private void scheduleRollOverLedgerTask() { - if (config.getMaximumRolloverTimeMs() > 0) { - long interval = config.getMaximumRolloverTimeMs(); - this.checkLedgerRollTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> { - rollCurrentLedgerIfFull(); - }), interval, interval, TimeUnit.MILLISECONDS); - } - } - private void checkAddTimeout() { long timeoutSec = config.getAddEntryTimeoutSeconds(); if (timeoutSec < 1) { @@ -3929,4 +3918,18 @@ public CompletableFuture> getEnsemblesAsync(long ledgerId) { return CompletableFuture.completedFuture(ensembles); }); } + + private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { + this.lastLedgerCreatedTimestamp = clock.millis(); + if (config.getMaximumRolloverTimeMs() > 0) { + if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) { + // new ledger has been created successfully + // and the previous checkLedgerRollTask is not done, we could cancel it + checkLedgerRollTask.cancel(true); + } + this.checkLedgerRollTask = this.scheduledExecutor.schedule( + safeRun(this::rollCurrentLedgerIfFull), getMaximumRolloverTimeMs(config), TimeUnit.MILLISECONDS); + } + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 1e2390dad5cb1a..62b44b8c091365 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2926,6 +2926,23 @@ public void testManagedLedgerRollOverIfFull() throws Exception { Assert.assertEquals(ledger.getTotalSize(), 0); } + @Test + public void testLedgerReachMaximumRolloverTime() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + config.setMaximumRolloverTime(1, TimeUnit.SECONDS); + + ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config); + long firstLedgerId = ml.addEntry("test".getBytes()).getLedgerId(); + + // the ledger rollover scheduled time is between 1000 and 1050 ms, + // wait 1100 ms, the ledger should be rolled over. + Awaitility.await() + .atMost(1100, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId()); + } + @Test public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java index 1bb8dcb24e4d88..77ec229862e90b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java @@ -72,7 +72,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception { managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); managedLedgerConfig.setMaxEntriesPerLedger(2); managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); - managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS); + managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS); int msgNum = 10; for (int i = 0; i < msgNum; i++) {