Skip to content

Commit

Permalink
merge: #10624
Browse files Browse the repository at this point in the history
10624: [Backport stable/8.0] fix: take snapshot if nothing was exported since last snapshot r=oleschoenburg a=backport-action

# Description
Backport of #10611 to `stable/8.0`.

relates to #9761

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and oleschoenburg committed Oct 6, 2022
2 parents 6051592 + 0dccfc7 commit 912297e
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* Used when there is no entry at the determined snapshot position while taking a transient
* snapshot.
*/
public class NoEntryAtSnapshotPosition extends RuntimeException {
public class NoEntryAtSnapshotPosition extends Exception {

public NoEntryAtSnapshotPosition(final String message) {
super(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotException.StateClosedException;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.util.FileUtil;
import io.camunda.zeebe.util.sched.ConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Optional;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;

Expand Down Expand Up @@ -144,46 +142,20 @@ private void takeTransientSnapshotInternal(
return;
}

long index = 0;
long term = 0;
long exportedPosition = exporterPositionSupplier.applyAsLong(db);

if (exportedPosition != -1) {

final long snapshotPosition =
determineSnapshotPosition(lowerBoundSnapshotPosition, exportedPosition);
final var optionalIndexed = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (optionalIndexed.isEmpty()) {
future.completeExceptionally(
new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d), but found no matching indexed entry which contains this position.",
snapshotPosition, lowerBoundSnapshotPosition, exportedPosition)));
return;
}

final var snapshotIndexedEntry = optionalIndexed.get();
index = snapshotIndexedEntry.index();
term = snapshotIndexedEntry.term();
} else {
final Optional<PersistedSnapshot> latestSnapshot =
constructableSnapshotStore.getLatestSnapshot();
exportedPosition = 0;

if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot
// to ensure that the records from there are not
// compacted until they get exported
final PersistedSnapshot persistedSnapshot = latestSnapshot.get();
index = persistedSnapshot.getIndex();
term = persistedSnapshot.getTerm();
} // otherwise index/term remains 0
final NextSnapshotId nextSnapshotId;
try {
nextSnapshotId = tryFindNextSnapshotId(lowerBoundSnapshotPosition);
} catch (final NoEntryAtSnapshotPosition e) {
future.completeExceptionally(e);
return;
}

final var transientSnapshot =
constructableSnapshotStore.newTransientSnapshot(
index, term, lowerBoundSnapshotPosition, exportedPosition);
nextSnapshotId.index,
nextSnapshotId.term,
nextSnapshotId.processedPosition,
nextSnapshotId.exportedPosition);

if (transientSnapshot.isLeft()) {
future.completeExceptionally(transientSnapshot.getLeft());
Expand All @@ -192,6 +164,50 @@ private void takeTransientSnapshotInternal(
}
}

private NextSnapshotId tryFindNextSnapshotId(final long lastProcessedPosition)
throws NoEntryAtSnapshotPosition {
final var exportedPosition = exporterPositionSupplier.applyAsLong(db);
if (exportedPosition == -1) {
final var latestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (latestSnapshot.isPresent()) {
// re-use index and term from the latest snapshot to ensure that the records from there are
// not compacted until they get exported.
final var persistedSnapshot = latestSnapshot.get();
return new NextSnapshotId(
persistedSnapshot.getIndex(), persistedSnapshot.getTerm(), lastProcessedPosition, 0);
}

return new NextSnapshotId(0, 0, lastProcessedPosition, 0);
}

final var snapshotPosition = Math.min(exportedPosition, lastProcessedPosition);
final var logEntry = entrySupplier.getPreviousIndexedEntry(snapshotPosition);

if (logEntry.isPresent()) {
return new NextSnapshotId(
logEntry.get().index(), logEntry.get().term(), lastProcessedPosition, exportedPosition);
}

// No log entry for snapshot position - try to use the index and term of the last snapshot to
// take new one
final var latestSnapshot = constructableSnapshotStore.getLatestSnapshot();
if (latestSnapshot.isPresent()) {
LOG.warn(
"No log entry for next snapshot position {}, using index and term from previous snapshot",
snapshotPosition);
return new NextSnapshotId(
latestSnapshot.get().getIndex(),
latestSnapshot.get().getTerm(),
lastProcessedPosition,
exportedPosition);
}

throw new NoEntryAtSnapshotPosition(
String.format(
"Failed to take snapshot. Expected to find an indexed entry for determined snapshot position %d (processedPosition = %d, exportedPosition=%d) or previous snapshot, but found neither.",
snapshotPosition, lastProcessedPosition, exportedPosition));
}

@SuppressWarnings("rawtypes")
private void openDb(final ActorFuture<ZeebeDb> future) {
try {
Expand Down Expand Up @@ -247,14 +263,6 @@ private void takeSnapshot(
});
}

private long determineSnapshotPosition(
final long lowerBoundSnapshotPosition, final long exportedPosition) {
final long snapshotPosition = Math.min(exportedPosition, lowerBoundSnapshotPosition);
LOG.trace(
"Based on lowest exporter position '{}' and last processed position '{}', determined '{}' as snapshot position.",
exportedPosition,
lowerBoundSnapshotPosition,
snapshotPosition);
return snapshotPosition;
}
private record NextSnapshotId(
long index, long term, long processedPosition, long exportedPosition) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,33 @@ public void shouldTakeSnapshotWhenExporterPositionNotChanged() {
assertThat(firstSnapshotId).isLessThan(newSnapshotId);
}

@Test
public void shouldTakeSnapshotWithoutIndexedEntryWhenProcessedPositionChanged() {
// given
final var snapshotPosition = 2;
exporterPosition.set(snapshotPosition - 1);
snapshotController.recover().join();
final var firstSnapshot =
snapshotController.takeTransientSnapshot(snapshotPosition).join().persist().join();
atomixRecordEntrySupplier.set(emptyEntrySupplier);

// when
final var snapshot =
snapshotController.takeTransientSnapshot(snapshotPosition + 1).join().persist().join();

// then
assertThat(snapshot)
.extracting(PersistedSnapshot::getCompactionBound)
.isEqualTo(firstSnapshot.getCompactionBound());
assertThat(snapshot.getId()).isNotEqualTo(firstSnapshot.getId());
final var newSnapshotId = FileBasedSnapshotId.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId = FileBasedSnapshotId.ofFileName(firstSnapshot.getId()).orElseThrow();
assertThat(newSnapshotId.getExportedPosition())
.isEqualTo(firstSnapshotId.getExportedPosition());
assertThat(newSnapshotId.getProcessedPosition())
.isGreaterThan(firstSnapshotId.getProcessedPosition());
}

@Test
public void shouldTakeSnapshotWhenProcessorPositionNotChanged() {
// given
Expand Down

0 comments on commit 912297e

Please sign in to comment.