Skip to content

Commit

Permalink
merge: #10213
Browse files Browse the repository at this point in the history
10213: feat(backup): backup manager can query status r=oleschoenburg a=deepthidevaki

## Description

- Backup manager can query the status of backup in the given backup store

## Related issues

closes #9981 



Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and deepthidevaki committed Aug 30, 2022
2 parents bbd0e5d + 21a5c37 commit 1480f7e
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void takeBackup(final long checkpointId, final long checkpointPosition) {
final InProgressBackupImpl inProgressBackup =
new InProgressBackupImpl(
snapshotStore,
new BackupIdentifierImpl(nodeId, partitionId, checkpointId),
getBackupId(checkpointId),
checkpointPosition,
numberOfPartitions,
actor,
Expand Down Expand Up @@ -113,13 +113,16 @@ public void takeBackup(final long checkpointId, final long checkpointPosition) {

@Override
public ActorFuture<BackupStatus> getBackupStatus(final long checkpointId) {
return CompletableActorFuture.completedExceptionally(
new UnsupportedOperationException("Not implemented"));
return internalBackupManager.getBackupStatus(getBackupId(checkpointId), actor);
}

@Override
public ActorFuture<Void> deleteBackup(final long checkpointId) {
return CompletableActorFuture.completedExceptionally(
new UnsupportedOperationException("Not implemented"));
}

private BackupIdentifierImpl getBackupId(final long checkpointId) {
return new BackupIdentifierImpl(nodeId, partitionId, checkpointId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
*/
package io.camunda.zeebe.backup.management;

import io.camunda.zeebe.backup.api.BackupIdentifier;
import io.camunda.zeebe.backup.api.BackupStatus;
import io.camunda.zeebe.backup.api.BackupStore;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import java.util.HashSet;
import java.util.Set;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -117,4 +120,22 @@ private BiConsumer<Void, Throwable> proceed(
}
};
}

ActorFuture<BackupStatus> getBackupStatus(
final BackupIdentifier backupId, final ConcurrencyControl executor) {
final var future = new CompletableActorFuture<BackupStatus>();
executor.run(
() ->
backupStore
.getStatus(backupId)
.whenComplete(
(status, error) -> {
if (error == null) {
future.complete(status);
} else {
future.completeExceptionally(error);
}
}));
return future;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.camunda.zeebe.backup.api.BackupStatus;
import io.camunda.zeebe.backup.api.BackupStore;
import io.camunda.zeebe.backup.common.BackupIdentifierImpl;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.testing.TestConcurrencyControl;
Expand Down Expand Up @@ -163,6 +165,37 @@ void shouldFailBackupIfStoringFailed() {
verifyInProgressBackupIsCleanedUpAfterFailure();
}

@Test
void shouldGetBackupStatus() {
// given
final BackupStatus status = mock(BackupStatus.class);
when(backupStore.getStatus(any())).thenReturn(CompletableFuture.completedFuture(status));

// when
final var result =
backupService.getBackupStatus(new BackupIdentifierImpl(1, 1, 1), concurrencyControl);

// then
assertThat(result).succeedsWithin(Duration.ofMillis(100)).isEqualTo(status);
}

@Test
void shouldCompleteFutureWhenBackupStatusFailed() {
// given
when(backupStore.getStatus(any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("Expected")));

// when
final var result =
backupService.getBackupStatus(new BackupIdentifierImpl(1, 1, 1), concurrencyControl);

// then
assertThat(result)
.failsWithin(Duration.ofMillis(100))
.withThrowableOfType(ExecutionException.class)
.withMessageContaining("Expected");
}

private ActorFuture<Void> failedFuture() {
final ActorFuture<Void> future = concurrencyControl.createFuture();
future.completeExceptionally(new RuntimeException("Expected"));
Expand Down

0 comments on commit 1480f7e

Please sign in to comment.