Skip to content

Commit

Permalink
[Branch-2.7] Fix ledger roll over scheduler task (#11226)
Browse files Browse the repository at this point in the history
The PR #11116 couldn't be cherry-picked to `branch-2.7`, because there are too many conflicts.

In PR #8946 the ledger rollover task had been moved from `BrokerService` to `ManagedLedgerImpl`, and this PR is based on it.

### Motivation

Currently, the ledger rollover scheduled task will execute before reach the ledger maximum rollover time, this will cause the ledger doesn't roll over in time.

### Modifications

Only make the ledger rollover scheduled task after the ledger created successfully.
If the scheduled task was executed when there is no entry in the current ledger, the scheduled task will not be re-executed, and if there is new entry is added the ledger will rollover.
  • Loading branch information
gaoran10 committed Jul 9, 2021
1 parent d5a844f commit 732395f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 30 deletions.
Expand Up @@ -180,6 +180,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final EntryCache entryCache;

private ScheduledFuture<?> timeoutTask;
private ScheduledFuture<?> checkLedgerRollTask;

/**
* This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store
Expand All @@ -199,7 +200,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private long lastLedgerCreationInitiationTimestamp = 0;

private static final Random random = new Random(System.currentTimeMillis());
private final long maximumRolloverTimeMs;
protected final Supplier<Boolean> mlOwnershipChecker;

volatile PositionImpl lastConfirmedEntry;
Expand Down Expand Up @@ -293,8 +293,6 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.uninitializedCursors = Maps.newHashMap();
this.clock = config.getClock();

// Get the next rollover time. Add a random value upto 5% to avoid rollover multiple ledgers at the same time
this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
this.mlOwnershipChecker = mlOwnershipChecker;
this.propertiesMap = Maps.newHashMap();
}
Expand Down Expand Up @@ -437,7 +435,7 @@ public void operationFailed(MetaStoreException e) {

log.info("[{}] Created ledger {}", name, lh.getId());
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTimeAndScheduleRolloverTask();
currentLedger = lh;

lastConfirmedEntry = new PositionImpl(lh.getId(), -1);
Expand Down Expand Up @@ -1215,6 +1213,10 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c
this.timeoutTask.cancel(false);
}

if (this.checkLedgerRollTask != null) {
this.checkLedgerRollTask.cancel(false);
}

LedgerHandle lh = currentLedger;

if (lh == null) {
Expand Down Expand Up @@ -1382,7 +1384,7 @@ private void updateLedgersListAfterRollover(MetaStoreCallback<Void> callback) {

public synchronized void updateLedgersIdsComplete(Stat stat) {
STATE_UPDATER.set(this, State.LedgerOpened);
lastLedgerCreatedTimestamp = clock.millis();
updateLastLedgerCreatedTimeAndScheduleRolloverTask();

if (log.isDebugEnabled()) {
log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size());
Expand Down Expand Up @@ -3122,7 +3124,7 @@ private boolean currentLedgerIsFull() {
|| currentLedgerSize >= (config.getMaxSizePerLedgerMb() * MegaByte));

long timeSinceLedgerCreationMs = clock.millis() - lastLedgerCreatedTimestamp;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= maximumRolloverTimeMs;
boolean maxLedgerTimeReached = timeSinceLedgerCreationMs >= config.getMaximumRolloverTimeMs();

if (spaceQuotaReached || maxLedgerTimeReached) {
if (config.getMinimumRolloverTimeMs() > 0) {
Expand Down Expand Up @@ -3573,4 +3575,21 @@ public CompletableFuture<Set<BookieId>> getEnsemblesAsync(long ledgerId) {
return CompletableFuture.completedFuture(ensembles);
});
}

private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
this.lastLedgerCreatedTimestamp = clock.millis();
if (config.getMaximumRolloverTimeMs() > 0) {
if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) {
// new ledger has been created successfully
// and the previous checkLedgerRollTask is not done, we could cancel it
checkLedgerRollTask.cancel(true);
}
// Add a random value upto 5% to avoid rollover multiple ledgers at the same time
long schedulerMaxRolloverTime =
(long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0));
this.checkLedgerRollTask = this.scheduledExecutor.schedule(
safeRun(this::rollCurrentLedgerIfFull), schedulerMaxRolloverTime, TimeUnit.MILLISECONDS);
}
}

}
Expand Up @@ -108,6 +108,7 @@
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -2871,4 +2872,22 @@ public void testOpEntryAdd_toString_doesNotThrowNPE(){
", dataLength=" + dataLength +
'}';
}

@Test
public void testLedgerReachMaximumRolloverTime() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
config.setMaximumRolloverTime(10, TimeUnit.SECONDS);

ManagedLedger ml = factory.open("ledger-reach-maximum-rollover-time", config);
long firstLedgerId = ((PositionImpl) ml.addEntry("test".getBytes())).getLedgerId();

// the ledger rollover scheduled time is between 1000 * 10 and 1000 * 10 + 500 ms,
// wait 1000 * 12 ms, the ledger should be rolled over.
Awaitility.await()
.atMost(12, TimeUnit.SECONDS)
.pollInterval(500, TimeUnit.MILLISECONDS)
.until(() -> firstLedgerId != ((PositionImpl) ml.addEntry("test".getBytes())).getLedgerId());
}

}
Expand Up @@ -212,7 +212,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService compactionMonitor;
private final ScheduledExecutorService messagePublishBufferMonitor;
private final ScheduledExecutorService consumedLedgersMonitor;
private final ScheduledExecutorService ledgerFullMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
private ScheduledExecutorService deduplicationSnapshotMonitor;
Expand Down Expand Up @@ -297,8 +296,6 @@ public BrokerService(PulsarService pulsar) throws Exception {
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
this.consumedLedgersMonitor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
this.ledgerFullMonitor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("ledger-full-monitor"));

this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
this.backlogQuotaChecker = Executors
Expand Down Expand Up @@ -440,7 +437,6 @@ public void start() throws Exception {
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startConsumedLedgersMonitor();
this.startLedgerFullMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
this.startCheckReplicationPolicies();
Expand Down Expand Up @@ -528,12 +524,6 @@ protected void startConsumedLedgersMonitor() {
}
}

protected void startLedgerFullMonitor() {
int interval = pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes();
ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull),
interval, interval, TimeUnit.MINUTES);
}

protected void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
Expand Down Expand Up @@ -672,7 +662,6 @@ public void close() throws IOException {
inactivityMonitor.shutdown();
messageExpiryMonitor.shutdown();
compactionMonitor.shutdown();
ledgerFullMonitor.shutdown();
messagePublishBufferMonitor.shutdown();
consumedLedgersMonitor.shutdown();
backlogQuotaChecker.shutdown();
Expand Down Expand Up @@ -1435,18 +1424,6 @@ private void checkConsumedLedgers() {
});
}

private void checkLedgerFull() {
forEachTopic((t) -> {
if (t instanceof PersistentTopic) {
Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
managedLedger -> {
managedLedger.rollCurrentLedgerIfFull();
}
);
}
});
}

public void checkMessageDeduplicationInfo() {
forEachTopic(Topic::checkMessageDeduplicationInfo);
}
Expand Down
Expand Up @@ -65,7 +65,7 @@ public void testCurrentLedgerRolloverIfFull() throws Exception {
managedLedgerConfig.setRetentionTime(1, TimeUnit.SECONDS);
managedLedgerConfig.setMaxEntriesPerLedger(2);
managedLedgerConfig.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaximumRolloverTime(5, TimeUnit.MILLISECONDS);
managedLedgerConfig.setMaximumRolloverTime(1, TimeUnit.SECONDS);

int msgNum = 10;
for (int i = 0; i < msgNum; i++) {
Expand Down

0 comments on commit 732395f

Please sign in to comment.