diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 049b1c7e64792..e3148b9986f08 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -401,8 +401,6 @@ public void operationFailed(MetaStoreException e) { }); scheduleTimeoutTask(); - - scheduleRollOverLedgerTask(); } private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { @@ -467,7 +465,7 @@ public void operationFailed(MetaStoreException e) { log.info("[{}] Created ledger {}", name, lh.getId()); STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); currentLedger = lh; lastConfirmedEntry = new PositionImpl(lh.getId(), -1); @@ -1488,7 +1486,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback) { public synchronized void updateLedgersIdsComplete(Stat stat) { STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); if (log.isDebugEnabled()) { log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); @@ -3361,7 +3359,7 @@ private boolean currentLedgerIsFull() { || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte)); long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp; - boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs; + boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs(); if (spaceQuotaReached || maxLedgerTimeReached) { if (config.getMinimumRolloverTimeMs() > 0) { @@ -3663,15 +3661,6 @@ private void scheduleTimeoutTask() { } } - private void scheduleRollOverLedgerTask() { - if (config.getMaximumRolloverTimeMs() > 0) { - long interval = config.getMaximumRolloverTimeMs(); - this.checkLedgerRollTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> { - rollCurrentLedgerIfFull(); - }), interval, interval, TimeUnit.MILLISECONDS); - } - } - private void checkAddTimeout() { long timeoutSec = config.getAddEntryTimeoutSeconds(); if (timeoutSec < 1) { @@ -3929,4 +3918,18 @@ public CompletableFuture> getEnsemblesAsync(long ledgerId) { return CompletableFuture.completedFuture(ensembles); }); } + + private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { + this.lastLedgerCreatedTimestamp = clock.millis(); + if (config.getMaximumRolloverTimeMs() > 0) { + if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) { + // new ledger has been created successfully + // and the previous checkLedgerRollTask is not done, we could cancel it + checkLedgerRollTask.cancel(true); + } + this.checkLedgerRollTask = this.scheduledExecutor.schedule( + safeRun(this::rollCurrentLedgerIfFull), getMaximumRolloverTimeMs(config), TimeUnit.MILLISECONDS); + } + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 1e2390dad5cb1..62b44b8c09136 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2926,6 +2926,23 @@ public void testManagedLedgerRollOverIfFull() throws Exception { Assert.assertEquals(ledger.getTotalSize(), 0); } + @Test + public void testLedgerReachMaximumRolloverTime() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + config.setMaximumRolloverTime(1, TimeUnit.SECONDS); + + ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config); + long firstLedgerId = ml.addEntry("test".getBytes()).getLedgerId(); + + // the ledger rollover scheduled time is between 1000 and 1050 ms, + // wait 1100 ms, the ledger should be rolled over. + Awaitility.await() + .atMost(1100, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId()); + } + @Test public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java index 1bb8dcb24e4d8..77ec229862e90 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java @@ -72,7 +72,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception { managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); managedLedgerConfig.setMaxEntriesPerLedger(2); managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); - managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS); + managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS); int msgNum = 10; for (int i = 0; i < msgNum; i++) {