Skip to content

Commit

Permalink
fix: take snapshot if nothing was exported since last snapshot
Browse files Browse the repository at this point in the history
When figuring out where to take the next snapshot, we determine the
snapshot position as the minimum of processing and exporter positions.

There was an edge case where a leader could process but no export.
In that case it'd use the exporter position as snapshot position and try
and find a log entry at that position.
If the log starts with the exporter position, for example
because the same broker previously received a snapshot and compacted the
log, no entry could be found which led to a failed snapshot.

We now try and use the latest snapshot's term and index if the log entry
could not be found. This ensures that new snapshots can be taken even if
nothing was exported since the last snapshot.
  • Loading branch information
oleschoenburg committed Oct 5, 2022
1 parent 42ee7c6 commit 52bef12
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 39 deletions.
Expand Up @@ -11,7 +11,7 @@
* Used when there is no entry at the determined snapshot position while taking a transient
* snapshot.
*/
public class NoEntryAtSnapshotPosition extends RuntimeException {
public class NoEntryAtSnapshotPosition extends Exception {

public NoEntryAtSnapshotPosition(final String message) {
super(message);
Expand Down
Expand Up @@ -16,13 +16,11 @@
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotException.StateClosedException;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.FileUtil;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;

Expand Down Expand Up @@ -144,46 +142,20 @@ private void takeTransientSnapshotInternal(
return;
}

long index = 0;
long term = 0;
long exportedPosition = exporterPositionSupplier.applyAsLong(db);

if (exportedPosition != -1) {

final long snapshotPosition =
determineSnapshotPosition(lowerBoundSnapshotPosition, exportedPosition);
final var optionalIndexed = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (optionalIndexed.isEmpty()) {
future.completeExceptionally(
new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d), but found no matching indexed entry which contains this position.",
snapshotPosition, lowerBoundSnapshotPosition, exportedPosition)));
return;
}

final var snapshotIndexedEntry = optionalIndexed.get();
index = snapshotIndexedEntry.index();
term = snapshotIndexedEntry.term();
} else {
final Optional<PersistedSnapshot> latestSnapshot =
constructableSnapshotStore.getLatestSnapshot();
exportedPosition = 0;

if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot
// to ensure that the records from there are not
// compacted until they get exported
final PersistedSnapshot persistedSnapshot = latestSnapshot.get();
index = persistedSnapshot.getIndex();
term = persistedSnapshot.getTerm();
} // otherwise index/term remains 0
final NextSnapshotId nextSnapshotId;
try {
nextSnapshotId = tryFindNextSnapshotId(lowerBoundSnapshotPosition);
} catch (final NoEntryAtSnapshotPosition e) {
future.completeExceptionally(e);
return;
}

final var transientSnapshot =
constructableSnapshotStore.newTransientSnapshot(
index, term, lowerBoundSnapshotPosition, exportedPosition);
nextSnapshotId.index,
nextSnapshotId.term,
nextSnapshotId.processedPosition,
nextSnapshotId.exportedPosition);

if (transientSnapshot.isLeft()) {
future.completeExceptionally(transientSnapshot.getLeft());
Expand All @@ -192,6 +164,53 @@ private void takeTransientSnapshotInternal(
}
}

private NextSnapshotId tryFindNextSnapshotId(final long lastProcessedPosition)
throws NoEntryAtSnapshotPosition {
final var exportedPosition = exporterPositionSupplier.applyAsLong(db);
if (exportedPosition == -1) {
final var latestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot to ensure that the records from there are
// not compacted until they get exported.
final var persistedSnapshot = latestSnapshot.get();
return new NextSnapshotId(
persistedSnapshot.getIndex(), persistedSnapshot.getTerm(), lastProcessedPosition, 0);
}

return new NextSnapshotId(0, 0, lastProcessedPosition, 0);
}

final var snapshotPosition = Math.min(exportedPosition, lastProcessedPosition);
final var logEntry = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (logEntry.isPresent()) {
return new NextSnapshotId(
logEntry.get().index(), logEntry.get().term(), lastProcessedPosition, exportedPosition);
}

// No log entry for snapshot position - try to use the index and term of the last snapshot to
// take new one
final var latestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (latestSnapshot.isPresent()) {
LOG.warn(
"No log entry for next snapshot position {}, using index and term from previous snapshot",
snapshotPosition);
return new NextSnapshotId(
latestSnapshot.get().getIndex(),
latestSnapshot.get().getTerm(),
lastProcessedPosition,
exportedPosition);
}

throw new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d) or previous snapshot, but found neither.",
snapshotPosition, lastProcessedPosition, exportedPosition));
}

private record NextSnapshotId(
long index, long term, long processedPosition, long exportedPosition) {}

@SuppressWarnings("rawtypes")
private void openDb(final ActorFuture<ZeebeDb> future) {
try {
Expand Down

0 comments on commit 52bef12

Please sign in to comment.