Skip to content

Commit

Permalink
merge: #10121
Browse files Browse the repository at this point in the history
10121: Persist snapshot metadata r=deepthidevaki a=deepthidevaki

## Description

- Renamed `FileBasedSnapshotMetadata` to `FileBasedSnapshotId`
- Created a new `FileBasedSnapshotMetadata` for the newly added metadata
- `AsyncSnapshotDirector` updates the metadata before persisting the snapshot
- While persisting the snapshot, metadata is written as a file in snapshot's directory and the checksum is updated to include this file
- The metadata file is replicated as a `SnapshotChunk`. So there is no change in replication protocol. On the receiver side, it can identify if a chunk is a metadata or not from the file name. It uses this information to create the metadata object of this snapshot. The receiver is backward compatible because the older version just write the metadata file to disk as the other SnapshotChunk and just ignore it later. If the snapshot is from the older version, the receiver at new version handles this case by creating an incomplete metadata with known information. This is safe to do.
- Added new tests and refactored existing test to make it work with the new semantics.

## Related issues

closes #10115 



Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and deepthidevaki committed Aug 22, 2022
2 parents 0588f34 + 1c6d09d commit 976c14d
Show file tree
Hide file tree
Showing 26 changed files with 599 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.camunda.zeebe.snapshots.SnapshotChunk;
import io.camunda.zeebe.snapshots.SnapshotChunkReader;
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.snapshots.SnapshotMetadata;
import io.camunda.zeebe.snapshots.SnapshotReservation;
import io.camunda.zeebe.util.StringUtil;
import io.camunda.zeebe.util.buffer.BufferUtil;
Expand Down Expand Up @@ -149,6 +150,11 @@ public long getChecksum() {
return 0;
}

@Override
public SnapshotMetadata getMetadata() {
return null;
}

@Override
public ActorFuture<SnapshotReservation> reserve() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotId;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -158,8 +158,8 @@ private void getFollowerPartitionStatus(
final var snapshotId = getSnapshotId(partition);
final var processedPositionInSnapshot =
snapshotId
.flatMap(FileBasedSnapshotMetadata::ofFileName)
.map(FileBasedSnapshotMetadata::getProcessedPosition)
.flatMap(FileBasedSnapshotId::ofFileName)
.map(FileBasedSnapshotId::getProcessedPosition)
.orElse(null);
final var status =
PartitionStatus.ofFollower(snapshotId.orElse(null), processedPositionInSnapshot);
Expand All @@ -179,8 +179,8 @@ private void getLeaderPartitionStatus(
final var snapshotId = getSnapshotId(partition);
final var processedPositionInSnapshot =
snapshotId
.flatMap(FileBasedSnapshotMetadata::ofFileName)
.map(FileBasedSnapshotMetadata::getProcessedPosition)
.flatMap(FileBasedSnapshotId::ofFileName)
.map(FileBasedSnapshotId::getProcessedPosition)
.orElse(null);

actor.runOnCompletion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ private void persistSnapshotIfLastWrittenPositionCommitted() {
commitPosition,
lastWrittenPosition,
pendingSnapshot);
final var snapshotPersistFuture = pendingSnapshot.persist();
final var snapshotPersistFuture =
pendingSnapshot.withLastFollowupEventPosition(lastWrittenPosition).persist();

snapshotPersistFuture.onComplete(
(snapshot, persistError) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.ZeebeClientBuilder;
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotId;
import io.netty.util.NetUtil;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -100,6 +100,6 @@ private SnapshotId waitForSnapshotAtBroker(
.getProcessedPositionInSnapshot())
.isNotNull());
final PartitionStatus partitionStatus = brokerAdminService.getPartitionStatus().get(1);
return FileBasedSnapshotMetadata.ofFileName(partitionStatus.getSnapshotId()).get();
return FileBasedSnapshotId.ofFileName(partitionStatus.getSnapshotId()).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.SnapshotException.StateClosedException;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotId;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStore;
import io.camunda.zeebe.snapshots.impl.SnapshotMetrics;
import io.camunda.zeebe.test.util.AutoCloseableRule;
Expand Down Expand Up @@ -193,9 +193,8 @@ public void shouldTakeSnapshotWhenExporterPositionNotChanged() {
.extracting(PersistedSnapshot::getCompactionBound)
.isEqualTo(firstSnapshot.getCompactionBound());
assertThat(snapshot.getId()).isNotEqualTo(firstSnapshot.getId());
final var newSnapshotId = FileBasedSnapshotMetadata.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId =
FileBasedSnapshotMetadata.ofFileName(firstSnapshot.getId()).orElseThrow();
final var newSnapshotId = FileBasedSnapshotId.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId = FileBasedSnapshotId.ofFileName(firstSnapshot.getId()).orElseThrow();
assertThat(firstSnapshotId).isLessThan(newSnapshotId);
}

Expand All @@ -218,9 +217,8 @@ public void shouldTakeSnapshotWhenProcessorPositionNotChanged() {
.extracting(PersistedSnapshot::getCompactionBound)
.isEqualTo(firstSnapshot.getCompactionBound());
assertThat(snapshot.getId()).isNotEqualTo(firstSnapshot.getId());
final var newSnapshotId = FileBasedSnapshotMetadata.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId =
FileBasedSnapshotMetadata.ofFileName(firstSnapshot.getId()).orElseThrow();
final var newSnapshotId = FileBasedSnapshotId.ofFileName(snapshot.getId()).orElseThrow();
final var firstSnapshotId = FileBasedSnapshotId.ofFileName(firstSnapshot.getId()).orElseThrow();
assertThat(firstSnapshotId).isLessThan(newSnapshotId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.SnapshotId;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotId;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.test.util.asserts.TopologyAssert;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
Expand Down Expand Up @@ -813,7 +813,7 @@ private Optional<SnapshotId> getSnapshot(final Broker broker, final int partitio

return Optional.ofNullable(partitionStatus)
.map(PartitionStatus::getSnapshotId)
.flatMap(FileBasedSnapshotMetadata::ofFileName);
.flatMap(FileBasedSnapshotId::ofFileName);
}

public Optional<SnapshotId> getSnapshot(final int brokerId) {
Expand Down
10 changes: 10 additions & 0 deletions snapshot/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,16 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-scheduler</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public interface PersistedSnapshot extends CloseableSilently {
*/
long getChecksum();

/**
* SnapshotMetadata includes information related to a snapshot.
*
* @return the metadata of the snapshot.
*/
SnapshotMetadata getMetadata();
/**
* Reserves this snapshot. When the snapshot is reserved, it is not deleted until it is released.
* The reservation status is not persisted. After a restart the snapshot will be in state
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.snapshots;

public interface SnapshotMetadata {

/**
* @return version of the snapshot
*/
int version();

/**
* @return processed position in the snapshot (same as in SnapshotId)
*/
long processedPosition();

/**
* @return exported position in the snapshot (same as in SnapshotId)
*/
long exportedPosition();

/**
* A snapshot is only valid if the logstream consists of the events from the processedPosition up
* to the followup event position.
*
* @return position of the last followUpEvent that must be in the logstream to ensure that the
* system can recover from the snapshot and the logstream.
*/
long lastFollowupEventPosition();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,15 @@ public interface TransientSnapshot extends PersistableSnapshot {
* @return true on success, false otherwise
*/
ActorFuture<Void> take(Consumer<Path> takeSnapshot);

/**
* A snapshot is only valid if the accompanying logstream has events from processedPosition up to
* the last followup event position. The last followUp event position is the position of an event
* whose source position >= actual processed position in the state.
*
* @param followupEventPosition position of the followup event which must be in the logstream to
* ensure that the system can recover from the snapshot and the events in the logstream.
* @return transient snapshot.
*/
TransientSnapshot withLastFollowupEventPosition(long followupEventPosition);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ public class FileBasedReceivedSnapshot implements ReceivedSnapshot {
private final ActorControl actor;
private final FileBasedSnapshotStore snapshotStore;

private final FileBasedSnapshotMetadata metadata;
private final FileBasedSnapshotId snapshotId;
private long expectedSnapshotChecksum;
private int expectedTotalCount;
private FileBasedSnapshotMetadata metadata;

FileBasedReceivedSnapshot(
final FileBasedSnapshotMetadata metadata,
final FileBasedSnapshotId snapshotId,
final Path directory,
final FileBasedSnapshotStore snapshotStore,
final ActorControl actor) {
this.metadata = metadata;
this.snapshotId = snapshotId;
this.snapshotStore = snapshotStore;
this.directory = directory;
this.actor = actor;
Expand All @@ -54,7 +55,7 @@ public class FileBasedReceivedSnapshot implements ReceivedSnapshot {

@Override
public long index() {
return metadata.getIndex();
return snapshotId.getIndex();
}

@Override
Expand Down Expand Up @@ -113,6 +114,18 @@ private void applyInternal(final SnapshotChunk snapshotChunk) throws SnapshotWri

LOGGER.trace("Consume snapshot snapshotChunk {} of snapshot {}", chunkName, snapshotId);
writeReceivedSnapshotChunk(snapshotChunk, snapshotFile);

if (snapshotChunk.getChunkName().equals(FileBasedSnapshotStore.METADATA_FILE_NAME)) {
try {
collectMetadata(snapshotChunk.getContent());
} catch (final IOException e) {
throw new SnapshotWriteException("Cannot decode snapshot metadata");
}
}
}

private void collectMetadata(final byte[] content) throws IOException {
metadata = FileBasedSnapshotMetadata.decode(content);
}

private void checkChunkChecksumIsValid(
Expand Down Expand Up @@ -157,18 +170,18 @@ private void checkTotalCountIsValid(final int currentTotalCount) throws Snapshot
}

private void checkSnapshotIdIsValid(final String snapshotId) throws SnapshotWriteException {
final var receivedSnapshotId = FileBasedSnapshotMetadata.ofFileName(snapshotId);
final var receivedSnapshotId = FileBasedSnapshotId.ofFileName(snapshotId);
if (receivedSnapshotId.isEmpty()) {
throw new SnapshotWriteException(
String.format("Snapshot file name '%s' has unexpected format", snapshotId));
}

final FileBasedSnapshotMetadata chunkMetadata = receivedSnapshotId.get();
if (metadata.compareTo(chunkMetadata) != 0) {
final FileBasedSnapshotId chunkSnapshotId = receivedSnapshotId.get();
if (this.snapshotId.compareTo(chunkSnapshotId) != 0) {
throw new SnapshotWriteException(
String.format(
"Expected snapshot chunk metadata to match metadata '%s' but was '%s' instead",
metadata, chunkMetadata));
"Expected snapshot id in chunk to be '%s' but was '%s' instead",
this.snapshotId, chunkSnapshotId));
}
}

Expand Down Expand Up @@ -213,7 +226,7 @@ public ActorFuture<PersistedSnapshot> persist() {

@Override
public SnapshotId snapshotId() {
return metadata;
return snapshotId;
}

@Override
Expand All @@ -233,7 +246,7 @@ private void abortInternal() {
}

private void persistInternal(final CompletableActorFuture<PersistedSnapshot> future) {
if (snapshotStore.hasSnapshotId(metadata.getSnapshotIdAsString())) {
if (snapshotStore.hasSnapshotId(snapshotId.getSnapshotIdAsString())) {
abortInternal();
future.complete(snapshotStore.getLatestSnapshot().orElseThrow());
return;
Expand All @@ -257,8 +270,17 @@ private void persistInternal(final CompletableActorFuture<PersistedSnapshot> fut
}

try {
if (metadata == null) {
// backward compatibility
metadata =
new FileBasedSnapshotMetadata(
FileBasedSnapshotStore.VERSION,
snapshotId.getProcessedPosition(),
snapshotId.getExportedPosition(),
Long.MAX_VALUE);
}
final PersistedSnapshot value =
snapshotStore.newSnapshot(metadata, directory, expectedSnapshotChecksum);
snapshotStore.newSnapshot(snapshotId, directory, expectedSnapshotChecksum, metadata);
future.complete(value);
} catch (final Exception e) {
future.completeExceptionally(e);
Expand All @@ -275,7 +297,7 @@ public String toString() {
+ ", snapshotStore="
+ snapshotStore.getName()
+ ", metadata="
+ metadata
+ snapshotId
+ '}';
}
}

0 comments on commit 976c14d

Please sign in to comment.