Skip to content

Commit

Permalink
merge: #10611
Browse files Browse the repository at this point in the history
10611: fix: take snapshot if nothing was exported since last snapshot r=oleschoenburg a=oleschoenburg

When figuring out where to take the next snapshot, we determine the snapshot position as the minimum of processing and exporter positions.

There was an edge case where a leader could process but no export.
In that case it'd use the exporter position as snapshot position and tryand find a log entry at that position.
If the log starts with the exporter position, for example because the same broker previously received a snapshot and compacted the log, no entry could be found which led to a failed snapshot.

We now try and use the latest snapshot's term and index if the log entry could not be found. This ensures that new snapshots can be taken even if nothing was exported since the last snapshot.

closes #9761 

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and oleschoenburg committed Oct 6, 2022
2 parents d9e3183 + ef090ab commit 64902ec
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 49 deletions.
Expand Up @@ -11,7 +11,7 @@
* Used when there is no entry at the determined snapshot position while taking a transient
* snapshot.
*/
public class NoEntryAtSnapshotPosition extends RuntimeException {
public class NoEntryAtSnapshotPosition extends Exception {

public NoEntryAtSnapshotPosition(final String message) {
super(message);
Expand Down
Expand Up @@ -16,13 +16,11 @@
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotException.StateClosedException;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.FileUtil;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;

Expand Down Expand Up @@ -144,46 +142,20 @@ private void takeTransientSnapshotInternal(
return;
}

long index = 0;
long term = 0;
long exportedPosition = exporterPositionSupplier.applyAsLong(db);

if (exportedPosition != -1) {

final long snapshotPosition =
determineSnapshotPosition(lowerBoundSnapshotPosition, exportedPosition);
final var optionalIndexed = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (optionalIndexed.isEmpty()) {
future.completeExceptionally(
new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d), but found no matching indexed entry which contains this position.",
snapshotPosition, lowerBoundSnapshotPosition, exportedPosition)));
return;
}

final var snapshotIndexedEntry = optionalIndexed.get();
index = snapshotIndexedEntry.index();
term = snapshotIndexedEntry.term();
} else {
final Optional<PersistedSnapshot> latestSnapshot =
constructableSnapshotStore.getLatestSnapshot();
exportedPosition = 0;

if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot
// to ensure that the records from there are not
// compacted until they get exported
final PersistedSnapshot persistedSnapshot = latestSnapshot.get();
index = persistedSnapshot.getIndex();
term = persistedSnapshot.getTerm();
} // otherwise index/term remains 0
final NextSnapshotId nextSnapshotId;
try {
nextSnapshotId = tryFindNextSnapshotId(lowerBoundSnapshotPosition);
} catch (final NoEntryAtSnapshotPosition e) {
future.completeExceptionally(e);
return;
}

final var transientSnapshot =
constructableSnapshotStore.newTransientSnapshot(
index, term, lowerBoundSnapshotPosition, exportedPosition);
nextSnapshotId.index,
nextSnapshotId.term,
nextSnapshotId.processedPosition,
nextSnapshotId.exportedPosition);

if (transientSnapshot.isLeft()) {
future.completeExceptionally(transientSnapshot.getLeft());
Expand All @@ -192,6 +164,50 @@ private void takeTransientSnapshotInternal(
}
}

private NextSnapshotId tryFindNextSnapshotId(final long lastProcessedPosition)
throws NoEntryAtSnapshotPosition {
final var exportedPosition = exporterPositionSupplier.applyAsLong(db);
if (exportedPosition == -1) {
final var latestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot to ensure that the records from there are
// not compacted until they get exported.
final var persistedSnapshot = latestSnapshot.get();
return new NextSnapshotId(
persistedSnapshot.getIndex(), persistedSnapshot.getTerm(), lastProcessedPosition, 0);
}

return new NextSnapshotId(0, 0, lastProcessedPosition, 0);
}

final var snapshotPosition = Math.min(exportedPosition, lastProcessedPosition);
final var logEntry = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (logEntry.isPresent()) {
return new NextSnapshotId(
logEntry.get().index(), logEntry.get().term(), lastProcessedPosition, exportedPosition);
}

// No log entry for snapshot position - try to use the index and term of the last snapshot to
// take new one
final var latestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (latestSnapshot.isPresent()) {
LOG.warn(
"No log entry for next snapshot position {}, using index and term from previous snapshot",
snapshotPosition);
return new NextSnapshotId(
latestSnapshot.get().getIndex(),
latestSnapshot.get().getTerm(),
lastProcessedPosition,
exportedPosition);
}

throw new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d) or previous snapshot, but found neither.",
snapshotPosition, lastProcessedPosition, exportedPosition));
}

@SuppressWarnings("rawtypes")
private void openDb(final ActorFuture<ZeebeDb> future) {
try {
Expand Down Expand Up @@ -247,14 +263,6 @@ private void takeSnapshot(
});
}

private long determineSnapshotPosition(
final long lowerBoundSnapshotPosition, final long exportedPosition) {
final long snapshotPosition = Math.min(exportedPosition, lowerBoundSnapshotPosition);
LOG.trace(
"Based on lowest exporter position '{}' and last processed position '{}', determined '{}' as snapshot position.",
exportedPosition,
lowerBoundSnapshotPosition,
snapshotPosition);
return snapshotPosition;
}
private record NextSnapshotId(
long index, long term, long processedPosition, long exportedPosition) {}
}
Expand Up @@ -198,6 +198,33 @@ public void shouldTakeSnapshotWhenExporterPositionNotChanged() {
assertThat(firstSnapshotId).isLessThan(newSnapshotId);
}

@Test
public void shouldTakeSnapshotWithoutIndexedEntryWhenProcessedPositionChanged() {
// given
final var snapshotPosition = 2;
exporterPosition.set(snapshotPosition - 1);
snapshotController.recover().join();
final var firstSnapshot =
snapshotController.takeTransientSnapshot(snapshotPosition).join().persist().join();
atomixRecordEntrySupplier.set(emptyEntrySupplier);

// when
final var snapshot =
snapshotController.takeTransientSnapshot(snapshotPosition + 1).join().persist().join();

// then
assertThat(snapshot)
.extracting(PersistedSnapshot::getCompactionBound)
.isEqualTo(firstSnapshot.getCompactionBound());
assertThat(snapshot.getId()).isNotEqualTo(firstSnapshot.getId());
final var newSnapshotId = FileBasedSnapshotId.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId = FileBasedSnapshotId.ofFileName(firstSnapshot.getId()).orElseThrow();
assertThat(newSnapshotId.getExportedPosition())
.isEqualTo(firstSnapshotId.getExportedPosition());
assertThat(newSnapshotId.getProcessedPosition())
.isGreaterThan(firstSnapshotId.getProcessedPosition());
}

@Test
public void shouldTakeSnapshotWhenProcessorPositionNotChanged() {
// given
Expand Down

0 comments on commit 64902ec

Please sign in to comment.