From eadebdcb06ce1b92e5e9ba9f0beb1c13d67db9da Mon Sep 17 00:00:00 2001 From: gaoran10 Date: Tue, 29 Jun 2021 19:01:33 +0800 Subject: [PATCH] cancel the previous undone ledger rollover schedule task if there is a new ledger has been created successfully. --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 13 +++++++++---- .../service/CurrentLedgerRolloverIfFullTest.java | 2 +- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a05cae08146f8..e3148b9986f08 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -465,7 +465,7 @@ public void operationFailed(MetaStoreException e) { log.info("[{}] Created ledger {}", name, lh.getId()); STATE_UPDATER.set(this, State.LedgerOpened); - updateLastLedgerCreatedTime(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); currentLedger = lh; lastConfirmedEntry = new PositionImpl(lh.getId(), -1); @@ -1486,7 +1486,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback) { public synchronized void updateLedgersIdsComplete(Stat stat) { STATE_UPDATER.set(this, State.LedgerOpened); - updateLastLedgerCreatedTime(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); if (log.isDebugEnabled()) { log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); @@ -3919,11 +3919,16 @@ public CompletableFuture> getEnsemblesAsync(long ledgerId) { }); } - private void updateLastLedgerCreatedTime() { + private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { this.lastLedgerCreatedTimestamp = clock.millis(); if (config.getMaximumRolloverTimeMs() > 0) { + if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) { + // new ledger has been created successfully + // and the previous checkLedgerRollTask is not done, we could cancel it + checkLedgerRollTask.cancel(true); + } this.checkLedgerRollTask = this.scheduledExecutor.schedule( - safeRun(this::rollCurrentLedgerIfFull), maximumRolloverTimeMs, TimeUnit.MILLISECONDS); + safeRun(this::rollCurrentLedgerIfFull), getMaximumRolloverTimeMs(config), TimeUnit.MILLISECONDS); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java index 1bb8dcb24e4d8..77ec229862e90 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java @@ -72,7 +72,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception { managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); managedLedgerConfig.setMaxEntriesPerLedger(2); managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); - managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS); + managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS); int msgNum = 10; for (int i = 0; i < msgNum; i++) {