Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: take snapshot if nothing was exported since last snapshot #10611

Merged
merged 2 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* Used when there is no entry at the determined snapshot position while taking a transient
* snapshot.
*/
public class NoEntryAtSnapshotPosition extends RuntimeException {
public class NoEntryAtSnapshotPosition extends Exception {

public NoEntryAtSnapshotPosition(final String message) {
super(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotException.StateClosedException;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.FileUtil;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;

Expand Down Expand Up @@ -144,46 +142,20 @@ private void takeTransientSnapshotInternal(
return;
}

long index = 0;
long term = 0;
long exportedPosition = exporterPositionSupplier.applyAsLong(db);

if (exportedPosition != -1) {

final long snapshotPosition =
determineSnapshotPosition(lowerBoundSnapshotPosition, exportedPosition);
final var optionalIndexed = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (optionalIndexed.isEmpty()) {
future.completeExceptionally(
new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d), but found no matching indexed entry which contains this position.",
snapshotPosition, lowerBoundSnapshotPosition, exportedPosition)));
return;
}

final var snapshotIndexedEntry = optionalIndexed.get();
index = snapshotIndexedEntry.index();
term = snapshotIndexedEntry.term();
} else {
final Optional<PersistedSnapshot> latestSnapshot =
constructableSnapshotStore.getLatestSnapshot();
exportedPosition = 0;

if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot
// to ensure that the records from there are not
// compacted until they get exported
final PersistedSnapshot persistedSnapshot = latestSnapshot.get();
index = persistedSnapshot.getIndex();
term = persistedSnapshot.getTerm();
} // otherwise index/term remains 0
final NextSnapshotId nextSnapshotId;
try {
nextSnapshotId = tryFindNextSnapshotId(lowerBoundSnapshotPosition);
} catch (final NoEntryAtSnapshotPosition e) {
future.completeExceptionally(e);
return;
}

final var transientSnapshot =
constructableSnapshotStore.newTransientSnapshot(
index, term, lowerBoundSnapshotPosition, exportedPosition);
nextSnapshotId.index,
nextSnapshotId.term,
nextSnapshotId.processedPosition,
nextSnapshotId.exportedPosition);

if (transientSnapshot.isLeft()) {
future.completeExceptionally(transientSnapshot.getLeft());
Expand All @@ -192,6 +164,50 @@ private void takeTransientSnapshotInternal(
}
}

private NextSnapshotId tryFindNextSnapshotId(final long lastProcessedPosition)
throws NoEntryAtSnapshotPosition {
final var exportedPosition = exporterPositionSupplier.applyAsLong(db);
if (exportedPosition == -1) {
final var latestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot to ensure that the records from there are
// not compacted until they get exported.
final var persistedSnapshot = latestSnapshot.get();
return new NextSnapshotId(
persistedSnapshot.getIndex(), persistedSnapshot.getTerm(), lastProcessedPosition, 0);
}

return new NextSnapshotId(0, 0, lastProcessedPosition, 0);
}

final var snapshotPosition = Math.min(exportedPosition, lastProcessedPosition);
final var logEntry = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (logEntry.isPresent()) {
return new NextSnapshotId(
logEntry.get().index(), logEntry.get().term(), lastProcessedPosition, exportedPosition);
}

// No log entry for snapshot position - try to use the index and term of the last snapshot to
// take new one
final var latestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (latestSnapshot.isPresent()) {
LOG.warn(
"No log entry for next snapshot position {}, using index and term from previous snapshot",
snapshotPosition);
return new NextSnapshotId(
oleschoenburg marked this conversation as resolved.
Show resolved Hide resolved
latestSnapshot.get().getIndex(),
latestSnapshot.get().getTerm(),
lastProcessedPosition,
exportedPosition);
}

throw new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d) or previous snapshot, but found neither.",
snapshotPosition, lastProcessedPosition, exportedPosition));
}

@SuppressWarnings("rawtypes")
private void openDb(final ActorFuture<ZeebeDb> future) {
try {
Expand Down Expand Up @@ -247,14 +263,6 @@ private void takeSnapshot(
});
}

private long determineSnapshotPosition(
final long lowerBoundSnapshotPosition, final long exportedPosition) {
final long snapshotPosition = Math.min(exportedPosition, lowerBoundSnapshotPosition);
LOG.trace(
"Based on lowest exporter position '{}' and last processed position '{}', determined '{}' as snapshot position.",
exportedPosition,
lowerBoundSnapshotPosition,
snapshotPosition);
return snapshotPosition;
}
private record NextSnapshotId(
oleschoenburg marked this conversation as resolved.
Show resolved Hide resolved
long index, long term, long processedPosition, long exportedPosition) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,33 @@ public void shouldTakeSnapshotWhenExporterPositionNotChanged() {
assertThat(firstSnapshotId).isLessThan(newSnapshotId);
}

@Test
public void shouldTakeSnapshotWithoutIndexedEntryWhenProcessedPositionChanged() {
// given
final var snapshotPosition = 2;
exporterPosition.set(snapshotPosition - 1);
snapshotController.recover().join();
final var firstSnapshot =
snapshotController.takeTransientSnapshot(snapshotPosition).join().persist().join();
atomixRecordEntrySupplier.set(emptyEntrySupplier);

// when
final var snapshot =
snapshotController.takeTransientSnapshot(snapshotPosition + 1).join().persist().join();

// then
assertThat(snapshot)
.extracting(PersistedSnapshot::getCompactionBound)
.isEqualTo(firstSnapshot.getCompactionBound());
assertThat(snapshot.getId()).isNotEqualTo(firstSnapshot.getId());
final var newSnapshotId = FileBasedSnapshotId.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId = FileBasedSnapshotId.ofFileName(firstSnapshot.getId()).orElseThrow();
assertThat(newSnapshotId.getExportedPosition())
.isEqualTo(firstSnapshotId.getExportedPosition());
assertThat(newSnapshotId.getProcessedPosition())
.isGreaterThan(firstSnapshotId.getProcessedPosition());
}

@Test
public void shouldTakeSnapshotWhenProcessorPositionNotChanged() {
// given
Expand Down