Skip to content

Commit

Permalink
merge: #10442 #10443
Browse files Browse the repository at this point in the history
10442: feat: support terminate end events r=saig0 a=saig0

## Description

Add support for BPMN terminate end events. See #8789 (comment) on how the BPMN element should work.

The implementation doesn't follow the BPMN spec in one point: the flow scope that contains the terminate end event is not terminated but completed. Reasoning:
- The state of the flow scope is a detail that doesn't influence the core behavior. In both cases, the process instance should continue, for example, by taking the outgoing sequence flow. The difference is not visible to process participants but only when monitoring the process instance, for example, in Operate.
- It fits better with the existing implementation. It would be a bigger effort to continue the process instance (e.g. taking the outgoing sequence flow) when the flow scope is terminated. As a result, we would end up in more complex code.
- It aligns with the behavior of Camunda Platform 7. 

Side note: I implemented the major parts during a [Live Hacking session](https://www.twitch.tv/videos/1584245006). 🎥 

## Related issues

closes #8789



10443: Do not take a backup if it already exists r=deepthidevaki a=deepthidevaki

## Description

After restore, the log is truncated to the checkpoint position. So the checkpoint record is processed again and will trigger a new backup with the same Id of the backup it restored from. With this PR, `BackupService` handles this case gracefully. In addition, we also do not take a new backup if existing backup is failed or in progress. Alternatively, we can delete this backup and take a new one. But chances of it happening (i.e triggering a new backup when one already is in progress/failed) is very low. So we can keep this simple.

## Related issues

closes #10430 



Co-authored-by: Philipp Ossler <philipp.ossler@gmail.com>
Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
3 people committed Sep 26, 2022
3 parents c7baaf4 + 1668ad1 + 1ef7d3c commit b738be2
Show file tree
Hide file tree
Showing 18 changed files with 935 additions and 90 deletions.
Expand Up @@ -122,17 +122,20 @@ public CompletableFuture<Void> save(final Backup backup) {
status -> {
final var snapshot = saveSnapshotFiles(backup);
final var segments = saveSegmentFiles(backup);
return CompletableFuture.allOf(snapshot, segments);
return CompletableFuture.allOf(snapshot, segments)
.thenComposeAsync(
ignored ->
updateManifestObject(
backup.id(),
Manifest::expectInProgress,
InProgressBackupManifest::asCompleted))
.exceptionallyComposeAsync(
throwable ->
updateManifestObject(
backup.id(), manifest -> manifest.asFailed(throwable))
// Mark the returned future as failed.
.thenCompose(ignore -> CompletableFuture.failedStage(throwable)));
})
.thenComposeAsync(
ignored ->
updateManifestObject(
backup.id(), Manifest::expectInProgress, InProgressBackupManifest::asCompleted))
.exceptionallyComposeAsync(
throwable ->
updateManifestObject(backup.id(), manifest -> manifest.asFailed(throwable))
// Mark the returned future as failed.
.thenCompose(status -> CompletableFuture.failedStage(throwable)))
// Discard status, it's either COMPLETED or the future is completed exceptionally
.thenApply(ignored -> null);
}
Expand Down
Expand Up @@ -7,13 +7,18 @@
*/
package io.camunda.zeebe.backup.testkit;

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.backup.api.Backup;
import io.camunda.zeebe.backup.api.BackupStatusCode;
import io.camunda.zeebe.backup.api.BackupStore;
import io.camunda.zeebe.backup.testkit.support.TestBackupProvider;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
Expand Down Expand Up @@ -56,4 +61,24 @@ default void backupFailsIfFilesAreMissing(final Backup backup) throws IOExceptio
.withRootCauseInstanceOf(NoSuchFileException.class)
.withMessageContaining(deletedFile.toString());
}

@ParameterizedTest
@ArgumentsSource(TestBackupProvider.class)
default void shouldNotOverwriteCompletedBackup(final Backup backup) {
// given
getStore().save(backup).join();
final var firstStatus = getStore().getStatus(backup.id()).join();

// when
final CompletableFuture<Void> saveAttempt = getStore().save(backup);

// then
assertThat(saveAttempt)
.failsWithin(Duration.ofSeconds(1))
.withThrowableOfType(ExecutionException.class);

final var status = getStore().getStatus(backup.id()).join();
assertThat(status.statusCode()).isEqualTo(BackupStatusCode.COMPLETED);
assertThat(status.lastModified()).isEqualTo(firstStatus.lastModified());
}
}
5 changes: 5 additions & 0 deletions backup/pom.xml
Expand Up @@ -101,6 +101,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down
@@ -0,0 +1,18 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.backup.management;

import io.camunda.zeebe.backup.api.BackupIdentifier;
import io.camunda.zeebe.backup.api.BackupStatus;

public class BackupAlreadyExistsException extends RuntimeException {

public BackupAlreadyExistsException(final BackupIdentifier id, final BackupStatus status) {
super("Backup with id %s already exists, status of the backup is %s".formatted(id, status));
}
}
Expand Up @@ -44,34 +44,70 @@ ActorFuture<Void> takeBackup(

backupsInProgress.add(inProgressBackup);

final ActorFuture<Void> snapshotFound = concurrencyControl.createFuture();
final ActorFuture<Void> snapshotReserved = concurrencyControl.createFuture();
final ActorFuture<Void> snapshotFilesCollected = concurrencyControl.createFuture();
final var checkCurrentBackup = backupStore.getStatus(inProgressBackup.id());

final ActorFuture<Void> backupSaved = concurrencyControl.createFuture();

final ActorFuture<Void> segmentFilesCollected = inProgressBackup.findSegmentFiles();
checkCurrentBackup.whenCompleteAsync(
(status, error) -> {
if (error != null) {
backupSaved.completeExceptionally(error);
} else {
takeBackupIfDoesNotExist(status, inProgressBackup, concurrencyControl, backupSaved);
}
},
concurrencyControl::run);

backupSaved.onComplete((ignore, error) -> closeInProgressBackup(inProgressBackup));
return backupSaved;
}

segmentFilesCollected.onComplete(
proceed(
snapshotFound::completeExceptionally,
() -> inProgressBackup.findValidSnapshot().onComplete(snapshotFound)));
private void takeBackupIfDoesNotExist(
final BackupStatus status,
final InProgressBackup inProgressBackup,
final ConcurrencyControl concurrencyControl,
final ActorFuture<Void> backupSaved) {

snapshotFound.onComplete(
proceed(
snapshotReserved::completeExceptionally,
() -> inProgressBackup.reserveSnapshot().onComplete(snapshotReserved)));
switch (status.statusCode()) {
case COMPLETED -> {
LOG.debug("Backup {} is already completed, will not take a new one", inProgressBackup.id());
backupSaved.complete(null);
}
case FAILED, IN_PROGRESS -> {
LOG.error(
"Backup {} already exists with status {}, will not take a new one",
inProgressBackup.id(),
status);
backupSaved.completeExceptionally(
new BackupAlreadyExistsException(inProgressBackup.id(), status));
}
default -> {
final ActorFuture<Void> snapshotFound = concurrencyControl.createFuture();
final ActorFuture<Void> snapshotReserved = concurrencyControl.createFuture();
final ActorFuture<Void> snapshotFilesCollected = concurrencyControl.createFuture();
final ActorFuture<Void> segmentFilesCollected = inProgressBackup.findSegmentFiles();

snapshotReserved.onComplete(
proceed(
snapshotFilesCollected::completeExceptionally,
() -> inProgressBackup.findSnapshotFiles().onComplete(snapshotFilesCollected)));
segmentFilesCollected.onComplete(
proceed(
snapshotFound::completeExceptionally,
() -> inProgressBackup.findValidSnapshot().onComplete(snapshotFound)));

snapshotFilesCollected.onComplete(
proceed(
error -> failBackup(inProgressBackup, backupSaved, error),
() -> saveBackup(inProgressBackup, backupSaved)));
snapshotFound.onComplete(
proceed(
snapshotReserved::completeExceptionally,
() -> inProgressBackup.reserveSnapshot().onComplete(snapshotReserved)));

return backupSaved;
snapshotReserved.onComplete(
proceed(
snapshotFilesCollected::completeExceptionally,
() -> inProgressBackup.findSnapshotFiles().onComplete(snapshotFilesCollected)));

snapshotFilesCollected.onComplete(
proceed(
error -> failBackup(inProgressBackup, backupSaved, error),
() -> saveBackup(inProgressBackup, backupSaved)));
}
}
}

private void saveBackup(
Expand All @@ -80,10 +116,7 @@ private void saveBackup(
.onComplete(
proceed(
error -> failBackup(inProgressBackup, backupSaved, error),
() -> {
backupSaved.complete(null);
closeInProgressBackup(inProgressBackup);
}));
() -> backupSaved.complete(null)));
}

private ActorFuture<Void> saveBackup(final InProgressBackup inProgressBackup) {
Expand All @@ -108,7 +141,6 @@ private void failBackup(
final Throwable error) {
backupSaved.completeExceptionally(error);
backupStore.markFailed(inProgressBackup.id(), error.getMessage());
closeInProgressBackup(inProgressBackup);
}

private void closeInProgressBackup(final InProgressBackup inProgressBackup) {
Expand Down
Expand Up @@ -9,6 +9,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
Expand All @@ -31,6 +32,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

Expand All @@ -40,12 +43,21 @@ class BackupServiceImplTest {
@Mock InProgressBackup inProgressBackup;
@Mock BackupStore backupStore;

@Mock BackupStatus notExistingBackupStatus;

private BackupServiceImpl backupService;
private final ConcurrencyControl concurrencyControl = new TestConcurrencyControl();

@BeforeEach
void setup() {
backupService = new BackupServiceImpl(backupStore);

lenient()
.when(notExistingBackupStatus.statusCode())
.thenReturn(BackupStatusCode.DOES_NOT_EXIST);
lenient()
.when(backupStore.getStatus(any()))
.thenReturn(CompletableFuture.completedFuture(notExistingBackupStatus));
}

@Test
Expand Down Expand Up @@ -275,6 +287,40 @@ void shouldMarkRemainingBackupsAsFailedWhenThrowsError() {
.markFailed(inProgressBackup, "Backup is cancelled due to leader change.");
}

@Test
void shouldNotTakeNewBackupIfBackupAlreadyCompleted() {
// given
final BackupStatus status = mock(BackupStatus.class);
when(status.statusCode()).thenReturn(BackupStatusCode.COMPLETED);
when(backupStore.getStatus(any())).thenReturn(CompletableFuture.completedFuture(status));

// when
backupService.takeBackup(inProgressBackup, concurrencyControl).join();

// then
verify(backupStore, never()).save(any());
}

@ParameterizedTest
@EnumSource(
value = BackupStatusCode.class,
names = {"IN_PROGRESS", "FAILED"})
void shouldNotTakeNewBackupIfBackupAlreadyExists(final BackupStatusCode statusCode) {
// given
final BackupStatus status = mock(BackupStatus.class);
when(status.statusCode()).thenReturn(statusCode);
when(backupStore.getStatus(any())).thenReturn(CompletableFuture.completedFuture(status));

// when
assertThat(backupService.takeBackup(inProgressBackup, concurrencyControl))
.failsWithin(Duration.ofMillis(100))
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(BackupAlreadyExistsException.class);

// then
verify(backupStore, never()).save(any());
}

private ActorFuture<Void> failedFuture() {
final ActorFuture<Void> future = concurrencyControl.createFuture();
future.completeExceptionally(new RuntimeException("Expected"));
Expand Down
Expand Up @@ -19,6 +19,7 @@
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.instance.EndEvent;
import io.camunda.zeebe.model.bpmn.instance.ErrorEventDefinition;
import io.camunda.zeebe.model.bpmn.instance.TerminateEventDefinition;

/**
* @author Sebastian Menski
Expand Down Expand Up @@ -71,4 +72,17 @@ public ErrorEventDefinitionBuilder errorEventDefinition() {
element.getEventDefinitions().add(errorEventDefinition);
return new ErrorEventDefinitionBuilder(modelInstance, errorEventDefinition);
}

/**
* Creates a terminate event definition and add it to the end event. It morphs the end event into
* a terminate end event.
*
* @return the builder object
*/
public B terminate() {
final TerminateEventDefinition terminateEventDefinition =
createInstance(TerminateEventDefinition.class);
element.getEventDefinitions().add(terminateEventDefinition);
return myself;
}
}
Expand Up @@ -19,6 +19,7 @@
import io.camunda.zeebe.model.bpmn.instance.ErrorEventDefinition;
import io.camunda.zeebe.model.bpmn.instance.EventDefinition;
import io.camunda.zeebe.model.bpmn.instance.MessageEventDefinition;
import io.camunda.zeebe.model.bpmn.instance.TerminateEventDefinition;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -28,7 +29,8 @@
public class EndEventValidator implements ModelElementValidator<EndEvent> {

private static final List<Class<? extends EventDefinition>> SUPPORTED_EVENT_DEFINITIONS =
Arrays.asList(ErrorEventDefinition.class, MessageEventDefinition.class);
Arrays.asList(
ErrorEventDefinition.class, MessageEventDefinition.class, TerminateEventDefinition.class);

@Override
public Class<EndEvent> getElementType() {
Expand Down Expand Up @@ -59,7 +61,7 @@ private void validateEventDefinition(
def -> {
if (SUPPORTED_EVENT_DEFINITIONS.stream().noneMatch(type -> type.isInstance(def))) {
validationResultCollector.addError(
0, "End events must be one of: none, error or message");
0, "End events must be one of: none, error, message, or terminate");
}
});
}
Expand Down
Expand Up @@ -18,6 +18,7 @@
import io.camunda.zeebe.model.bpmn.instance.ErrorEventDefinition;
import io.camunda.zeebe.model.bpmn.instance.EventDefinition;
import io.camunda.zeebe.model.bpmn.instance.MessageEventDefinition;
import io.camunda.zeebe.model.bpmn.instance.TerminateEventDefinition;
import io.camunda.zeebe.model.bpmn.instance.TimerEventDefinition;
import java.util.Arrays;
import java.util.List;
Expand All @@ -28,7 +29,10 @@ public class EventDefinitionValidator implements ModelElementValidator<EventDefi

private static final List<Class<? extends EventDefinition>> SUPPORTED_EVENT_DEFINITIONS =
Arrays.asList(
MessageEventDefinition.class, TimerEventDefinition.class, ErrorEventDefinition.class);
MessageEventDefinition.class,
TimerEventDefinition.class,
ErrorEventDefinition.class,
TerminateEventDefinition.class);

@Override
public Class<EventDefinition> getElementType() {
Expand Down

0 comments on commit b738be2

Please sign in to comment.