From 732395f1c550e701a7e806f828cc516e6dadb82a Mon Sep 17 00:00:00 2001 From: ran Date: Fri, 9 Jul 2021 08:56:07 +0800 Subject: [PATCH] [Branch-2.7] Fix ledger roll over scheduler task (#11226) The PR https://github.com/apache/pulsar/pull/11116 couldn't be cherry-picked to `branch-2.7`, because there are too many conflicts. In PR https://github.com/apache/pulsar/pull/8946 the ledger rollover task had been moved from `BrokerService` to `ManagedLedgerImpl`, and this PR is based on it. ### Motivation Currently, the ledger rollover scheduled task will execute before reach the ledger maximum rollover time, this will cause the ledger doesn't roll over in time. ### Modifications Only make the ledger rollover scheduled task after the ledger created successfully. If the scheduled task was executed when there is no entry in the current ledger, the scheduled task will not be re-executed, and if there is new entry is added the ledger will rollover. --- .../mledger/impl/ManagedLedgerImpl.java | 31 +++++++++++++++---- .../mledger/impl/ManagedLedgerTest.java | 19 ++++++++++++ .../pulsar/broker/service/BrokerService.java | 23 -------------- .../CurrentLedgerRolloverIfFullTest.java | 2 +- 4 files changed, 45 insertions(+), 30 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 04877575c2593..223443f606a8d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -180,6 +180,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { final EntryCache entryCache; private ScheduledFuture timeoutTask; + private ScheduledFuture checkLedgerRollTask; /** * This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store @@ -199,7 +200,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private long lastLedgerCreationInitiationTimestamp = 0; private static final Random random = new Random(System.currentTimeMillis()); - private final long maximumRolloverTimeMs; protected final Supplier mlOwnershipChecker; volatile PositionImpl lastConfirmedEntry; @@ -293,8 +293,6 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.uninitializedCursors = Maps.newHashMap(); this.clock = config.getClock(); - // Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time - this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0)); this.mlOwnershipChecker = mlOwnershipChecker; this.propertiesMap = Maps.newHashMap(); } @@ -437,7 +435,7 @@ public void operationFailed(MetaStoreException e) { log.info("[{}] Created ledger {}", name, lh.getId()); STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); currentLedger = lh; lastConfirmedEntry = new PositionImpl(lh.getId(), -1); @@ -1215,6 +1213,10 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c this.timeoutTask.cancel(false); } + if (this.checkLedgerRollTask != null) { + this.checkLedgerRollTask.cancel(false); + } + LedgerHandle lh = currentLedger; if (lh == null) { @@ -1382,7 +1384,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback callback) { public synchronized void updateLedgersIdsComplete(Stat stat) { STATE_UPDATER.set(this, State.LedgerOpened); - lastLedgerCreatedTimestamp = clock.millis(); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); if (log.isDebugEnabled()) { log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); @@ -3122,7 +3124,7 @@ private boolean currentLedgerIsFull() { || currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte)); long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp; - boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs; + boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs(); if (spaceQuotaReached || maxLedgerTimeReached) { if (config.getMinimumRolloverTimeMs() > 0) { @@ -3573,4 +3575,21 @@ public CompletableFuture> getEnsemblesAsync(long ledgerId) { return CompletableFuture.completedFuture(ensembles); }); } + + private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { + this.lastLedgerCreatedTimestamp = clock.millis(); + if (config.getMaximumRolloverTimeMs() > 0) { + if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) { + // new ledger has been created successfully + // and the previous checkLedgerRollTask is not done, we could cancel it + checkLedgerRollTask.cancel(true); + } + // Add a random value upto 5% to avoid rollover multiple ledgers at the same time + long schedulerMaxRolloverTime = + (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0)); + this.checkLedgerRollTask = this.scheduledExecutor.schedule( + safeRun(this::rollCurrentLedgerIfFull), schedulerMaxRolloverTime, TimeUnit.MILLISECONDS); + } + } + } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 7898db1e8ae55..922af4374fdcf 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -108,6 +108,7 @@ import org.apache.zookeeper.MockZooKeeper; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -2871,4 +2872,22 @@ public void testOpEntryAdd_toString_doesNotThrowNPE(){ ", dataLength=" + dataLength + '}'; } + + @Test + public void testLedgerReachMaximumRolloverTime() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + config.setMaximumRolloverTime(10, TimeUnit.SECONDS); + + ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config); + long firstLedgerId = ((PositionImpl) ml.addEntry("test".getBytes())).getLedgerId(); + + // the ledger rollover scheduled time is between 1000 * 10 and 1000 * 10 + 500 ms, + // wait 1000 * 12 ms, the ledger should be rolled over. + Awaitility.await() + .atMost(12, TimeUnit.SECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .until(() -> firstLedgerId != ((PositionImpl) ml.addEntry("test".getBytes())).getLedgerId()); + } + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4f41346625900..86716ade27589 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -212,7 +212,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener { - if (t instanceof PersistentTopic) { - Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent( - managedLedger -> { - managedLedger.rollCurrentLedgerIfFull(); - } - ); - } - }); - } - public void checkMessageDeduplicationInfo() { forEachTopic(Topic::checkMessageDeduplicationInfo); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java index 783eac59dc99b..228a0aa479e98 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java @@ -65,7 +65,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception { managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS); managedLedgerConfig.setMaxEntriesPerLedger(2); managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); - managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS); + managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS); int msgNum = 10; for (int i = 0; i < msgNum; i++) {