diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 049b1c7e64792e..a05cae08146f85 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -401,8 +401,6 @@ public void operationFailed(MetaStoreException e) { }); scheduleTimeoutTask(); - - scheduleRollOverLedgerTask(); } private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { @@ -467,7 +465,7 @@ public void operationFailed(MetaStoreException e) { log.info("[{}] Created ledger {}", name, lh.getId()); STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTime(); currentLedger = lh; lastConfirmedEntry = new PositionImpl(lh.getId(), -1); @@ -1488,7 +1486,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback) { public synchronized void updateLedgersIdsComplete(Stat stat) { STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTime(); if (log.isDebugEnabled()) { log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); @@ -3361,7 +3359,7 @@ private boolean currentLedgerIsFull() { || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte)); long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp; - boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs; + boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs(); if (spaceQuotaReached || maxLedgerTimeReached) { if (config.getMinimumRolloverTimeMs() > 0) { @@ -3663,15 +3661,6 @@ private void scheduleTimeoutTask() { } } - private void scheduleRollOverLedgerTask() { - if (config.getMaximumRolloverTimeMs() > 0) { - long interval = config.getMaximumRolloverTimeMs(); - this.checkLedgerRollTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> { - rollCurrentLedgerIfFull(); - }), interval, interval, TimeUnit.MILLISECONDS); - } - } - private void checkAddTimeout() { long timeoutSec = config.getAddEntryTimeoutSeconds(); if (timeoutSec < 1) { @@ -3929,4 +3918,13 @@ public CompletableFuture> getEnsemblesAsync(long ledgerId) { return CompletableFuture.completedFuture(ensembles); }); } + + private void updateLastLedgerCreatedTime() { + this.lastLedgerCreatedTimestamp = clock.millis(); + if (config.getMaximumRolloverTimeMs() > 0) { + this.checkLedgerRollTask = this.scheduledExecutor.schedule( + safeRun(this::rollCurrentLedgerIfFull), maximumRolloverTimeMs, TimeUnit.MILLISECONDS); + } + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index e0b15fe4aea9cc..d1ab5a7c2a6039 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -2922,6 +2922,23 @@ public void testManagedLedgerRollOverIfFull() throws Exception { Assert.assertEquals(ledger.getTotalSize(), 0); } + @Test + public void testLedgerReachMaximumRolloverTime() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionTime(1, TimeUnit.SECONDS); + config.setMaxEntriesPerLedger(2); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + config.setMaximumRolloverTime(1, TimeUnit.SECONDS); + + ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config); + Position position1 = ml.addEntry("test".getBytes()); + + Thread.sleep(1000); + Position position2 = ml.addEntry("test".getBytes()); + + Assert.assertNotEquals(position1.getLedgerId(), position2.getLedgerId()); + } + @Test public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig();