Skip to content

Commit

Permalink
merge: #10182
Browse files Browse the repository at this point in the history
10182: feat: s3 backup store can restore a backup r=deepthidevaki a=oleschoenburg

Restores a backup by downloading snapshot and segment objects and storing them as files in a given target folder.

closes #10179 

Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and oleschoenburg committed Aug 26, 2022
2 parents 82d6a8a + 08ba358 commit ced608b
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 4 deletions.
Expand Up @@ -16,7 +16,10 @@
import io.camunda.zeebe.backup.api.BackupStatus;
import io.camunda.zeebe.backup.api.BackupStatusCode;
import io.camunda.zeebe.backup.api.BackupStore;
import io.camunda.zeebe.backup.api.NamedFileSet;
import io.camunda.zeebe.backup.common.BackupImpl;
import io.camunda.zeebe.backup.common.BackupStatusImpl;
import io.camunda.zeebe.backup.common.NamedFileSetImpl;
import io.camunda.zeebe.backup.s3.S3BackupStoreException.BackupDeletionIncomplete;
import io.camunda.zeebe.backup.s3.S3BackupStoreException.BackupInInvalidStateException;
import io.camunda.zeebe.backup.s3.S3BackupStoreException.BackupReadException;
Expand All @@ -28,7 +31,9 @@
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -131,8 +136,40 @@ public CompletableFuture<Void> delete(final BackupIdentifier id) {
}

@Override
public CompletableFuture<Backup> restore(final BackupIdentifier id) {
throw new UnsupportedOperationException();
public CompletableFuture<Backup> restore(final BackupIdentifier id, Path targetFolder) {
final var backupPrefix = objectPrefix(id);
return requireBackupStatus(id, EnumSet.of(BackupStatusCode.COMPLETED))
.thenComposeAsync(this::readMetadataObject)
.thenComposeAsync(
metadata ->
downloadNamedFileSet(
backupPrefix + SEGMENTS_PREFIX, metadata.segmentFileNames(), targetFolder)
.thenCombineAsync(
downloadNamedFileSet(
backupPrefix + SNAPSHOT_PREFIX,
metadata.snapshotFileNames(),
targetFolder),
(segments, snapshot) ->
new BackupImpl(id, metadata.descriptor(), snapshot, segments)));
}

private CompletableFuture<NamedFileSet> downloadNamedFileSet(
final String sourcePrefix, final Set<String> fileNames, Path targetFolder) {
final var downloadedFiles = new ConcurrentHashMap<String, Path>();
final CompletableFuture<?>[] futures =
fileNames.stream()
.map(
fileName -> {
final var path = targetFolder.resolve(fileName);
return client
.getObject(
req -> req.bucket(config.bucketName()).key(sourcePrefix + fileName), path)
.thenApply(response -> downloadedFiles.put(fileName, path));
})
.toArray(CompletableFuture[]::new);

return CompletableFuture.allOf(futures)
.thenApply(ignored -> new NamedFileSetImpl(downloadedFiles));
}

@Override
Expand Down
@@ -0,0 +1,47 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.backup.s3;

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.backup.api.Backup;
import java.nio.file.Path;
import org.assertj.core.api.AbstractAssert;

final class BackupAssert extends AbstractAssert<BackupAssert, Backup> {

private BackupAssert(final Backup actual, final Class<?> selfType) {
super(actual, selfType);
}

public static BackupAssert assertThatBackup(Backup actual) {
return new BackupAssert(actual, BackupAssert.class);
}

@SuppressWarnings("UnusedReturnValue")
public BackupAssert hasSameContentsAs(Backup expected) {
assertThat(actual.id()).isEqualTo(expected.id());
assertThat(actual.descriptor()).isEqualTo(expected.descriptor());
assertThat(actual.snapshot().names()).isEqualTo(expected.snapshot().names());
assertThat(actual.segments().names()).isEqualTo(expected.segments().names());

NamedFileSetAssert.assertThatNamedFileSet(actual.snapshot())
.hasSameContentsAs(expected.snapshot());
NamedFileSetAssert.assertThatNamedFileSet(actual.segments())
.hasSameContentsAs(expected.segments());

return this;
}

@SuppressWarnings("UnusedReturnValue")
public BackupAssert residesInPath(Path expectedPath) {
NamedFileSetAssert.assertThatNamedFileSet(actual.snapshot()).residesInPath(expectedPath);
NamedFileSetAssert.assertThatNamedFileSet(actual.segments()).residesInPath(expectedPath);
return this;
}
}
@@ -0,0 +1,49 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.backup.s3;

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.backup.api.NamedFileSet;
import java.nio.file.Path;
import org.assertj.core.api.AbstractAssert;

final class NamedFileSetAssert extends AbstractAssert<NamedFileSetAssert, NamedFileSet> {

private NamedFileSetAssert(final NamedFileSet namedFileSet, final Class<?> selfType) {
super(namedFileSet, selfType);
}

public static NamedFileSetAssert assertThatNamedFileSet(NamedFileSet actual) {
return new NamedFileSetAssert(actual, NamedFileSetAssert.class);
}

@SuppressWarnings("UnusedReturnValue")
public NamedFileSetAssert hasSameContentsAs(NamedFileSet expected) {
for (final var expectedEntry : expected.namedFiles().entrySet()) {
final var expectedName = expectedEntry.getKey();
final var expectedPath = expectedEntry.getValue();
final var actualNamedFiles = actual.namedFiles();

assertThat(actualNamedFiles).containsKey(expectedName);
final var actualPath = actualNamedFiles.get(expectedEntry.getKey());
assertThat(actualPath).hasSameBinaryContentAs(expectedPath);
}
return this;
}

@SuppressWarnings("UnusedReturnValue")
public NamedFileSetAssert residesInPath(Path expectedPath) {
assertThat(actual.files())
.allSatisfy(
actualPath -> {
assertThat(actualPath).hasParent(expectedPath);
});
return this;
}
}
Expand Up @@ -7,6 +7,7 @@
*/
package io.camunda.zeebe.backup.s3;

import static io.camunda.zeebe.backup.s3.BackupAssert.assertThatBackup;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.from;

Expand Down Expand Up @@ -442,6 +443,33 @@ void deletingInProgressBackupFails(@TempDir Path tempDir) throws IOException {
.withRootCauseInstanceOf(BackupInInvalidStateException.class);
}

@Test
void restoreIsSuccessful(@TempDir Path sourceDir, @TempDir Path targetDir) throws IOException {
// given
final var backup = prepareTestBackup(sourceDir);
store.save(backup).join();

// when
final var result = store.restore(backup.id(), targetDir);

// then
assertThat(result).succeedsWithin(Duration.ofSeconds(10));
}

@Test
void restoredBackupHasSameContents(@TempDir Path sourceDir, @TempDir Path targetDir)
throws IOException {
// given
final var originalBackup = prepareTestBackup(sourceDir);
store.save(originalBackup).join();

// when
final var restored = store.restore(originalBackup.id(), targetDir).join();

// then
assertThatBackup(restored).hasSameContentsAs(originalBackup).residesInPath(targetDir);
}

private Backup prepareTestBackup(Path tempDir) throws IOException {
Files.createDirectory(tempDir.resolve("segments/"));
final var seg1 = Files.createFile(tempDir.resolve("segments/segment-file-1"));
Expand Down
Expand Up @@ -7,6 +7,7 @@
*/
package io.camunda.zeebe.backup.api;

import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;

/** A store where the backup is stored * */
Expand All @@ -22,7 +23,7 @@ public interface BackupStore {
CompletableFuture<Void> delete(BackupIdentifier id);

/** Restores the backup */
CompletableFuture<Backup> restore(BackupIdentifier id);
CompletableFuture<Backup> restore(BackupIdentifier id, Path targetFolder);

/**
* Marks the backup as failed. If saving a backup failed, the backups store must mark it as
Expand Down
Expand Up @@ -12,6 +12,7 @@
import io.camunda.zeebe.backup.api.BackupStatus;
import io.camunda.zeebe.backup.api.BackupStatusCode;
import io.camunda.zeebe.backup.api.BackupStore;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;

// A placeholder backup store until a proper backup store is available
Expand All @@ -38,7 +39,7 @@ public CompletableFuture<Void> delete(final BackupIdentifier id) {
}

@Override
public CompletableFuture<Backup> restore(final BackupIdentifier id) {
public CompletableFuture<Backup> restore(final BackupIdentifier id, final Path targetFolder) {
return CompletableFuture.failedFuture(
new UnsupportedOperationException("No backup store configured"));
}
Expand Down

0 comments on commit ced608b

Please sign in to comment.