Skip to content

Commit

Permalink
merge: #9121
Browse files Browse the repository at this point in the history
9121: Prevent duplicate key insertion for DMN r=remcowesterhoud a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
To make sure we keep our data consistent we should make sure we don't store duplicate values into the state. The DMN resources were missing the required checks to prevent this. We would always try to insert the resources, disregarding if it is a duplicate. This change filters out the duplicate records and guarantees we only store the non-duplicates.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #9115 



Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Apr 13, 2022
2 parents 013e2cd + 96810c8 commit da40ce0
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
package io.camunda.zeebe.engine.state.appliers;

import static io.camunda.zeebe.util.buffer.BufferUtil.wrapArray;
import static java.util.function.Predicate.not;

import io.camunda.zeebe.engine.state.TypedEventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableDecisionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessState;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRequirementsMetadataRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRequirementsRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
Expand Down Expand Up @@ -40,8 +43,8 @@ public void applyState(final long key, final DeploymentRecord value) {
}

private void putDmnResourcesInState(final DeploymentRecord value) {
value
.decisionRequirementsMetadata()
value.decisionRequirementsMetadata().stream()
.filter(not(DecisionRequirementsMetadataRecord::isDuplicate))
.forEach(
drg -> {
final var resource = getResourceByName(value, drg.getResourceName());
Expand All @@ -50,7 +53,9 @@ private void putDmnResourcesInState(final DeploymentRecord value) {
decisionState.storeDecisionRequirements(decisionRequirementsRecord);
});

value.decisionsMetadata().forEach(decisionState::storeDecisionRecord);
value.decisionsMetadata().stream()
.filter(not(DecisionRecord::isDuplicate))
.forEach(decisionState::storeDecisionRecord);
}

private DirectBuffer getResourceByName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.value.DeploymentDistributionRecordValue;
import io.camunda.zeebe.protocol.record.value.DeploymentRecordValue;
import io.camunda.zeebe.protocol.record.value.deployment.DecisionRecordValue;
import io.camunda.zeebe.protocol.record.value.deployment.DecisionRequirementsMetadataValue;
import io.camunda.zeebe.protocol.record.value.deployment.DeploymentResource;
import io.camunda.zeebe.protocol.record.value.deployment.ProcessMetadataValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
Expand All @@ -42,6 +44,8 @@ public final class CreateDeploymentMultiplePartitionsTest {
Bpmn.createExecutableProcess(PROCESS_ID).startEvent().endEvent().done();
private static final BpmnModelInstance PROCESS_2 =
Bpmn.createExecutableProcess("process2").startEvent().endEvent().done();
private static final String DMN_DECISION_TABLE = "/dmn/decision-table.dmn";
private static final String DMN_DECISION_TABLE_V2 = "/dmn/decision-table_v2.dmn";

@Rule
public final RecordingExporterTestWatcher recordingExporterTestWatcher =
Expand Down Expand Up @@ -300,7 +304,7 @@ public void shouldFilterDuplicateProcess() {
.collect(Collectors.toList());

assertThat(repeatedWfs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedWfs.forEach(repeatedWf -> assertSameResource(originalProcesses.get(0), repeatedWf));
repeatedWfs.forEach(repeatedWf -> assertSameProcess(originalProcesses.get(0), repeatedWf));
}

@Test
Expand All @@ -320,7 +324,7 @@ public void shouldNotFilterDifferentProcesses() {
final var repeatedProcesses = repeated.getValue().getProcessesMetadata();
assertThat(repeatedProcesses.size()).isEqualTo(originalProcesses.size()).isOne();

assertDifferentResources(originalProcesses.get(0), repeatedProcesses.get(0));
assertDifferentProcesses(originalProcesses.get(0), repeatedProcesses.get(0));

assertThat(
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTE)
Expand All @@ -338,10 +342,106 @@ public void shouldNotFilterDifferentProcesses() {

assertThat(repeatedWfs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedWfs.forEach(
repeatedWf -> assertDifferentResources(originalProcesses.get(0), repeatedWf));
repeatedWf -> assertDifferentProcesses(originalProcesses.get(0), repeatedWf));
}

private void assertSameResource(
@Test
public void shouldFilterDuplicateDmnResource() {
// given
final Record<DeploymentRecordValue> original =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE).deploy();

// when
final Record<DeploymentRecordValue> repeated =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE).deploy();

// then
assertThat(repeated.getKey()).isGreaterThan(original.getKey());

final var originalDecision = original.getValue().getDecisionsMetadata();
final var originalDrg = original.getValue().getDecisionRequirementsMetadata();
final var repeatedDecision = repeated.getValue().getDecisionsMetadata();
final var repeatedDrg = repeated.getValue().getDecisionRequirementsMetadata();
assertThat(repeatedDecision.size()).isEqualTo(originalDecision.size()).isOne();
assertThat(repeatedDrg.size()).isEqualTo(originalDrg.size()).isOne();

assertThat(
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTE)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.count())
.isEqualTo(PARTITION_COUNT - 1);

final var repeatedDecisions =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDecisions.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDecisions.forEach(r -> assertSameDecision(originalDecision.get(0), r));

final var repeatedDrgs =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionRequirementsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDrgs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDrgs.forEach(r -> assertSameDrg(originalDrg.get(0), r));
}

@Test
public void shouldNotFilterDifferentDmnResource() {
// given
final Record<DeploymentRecordValue> original =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE).deploy();

// when
final Record<DeploymentRecordValue> repeated =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE_V2).deploy();

// then
assertThat(repeated.getKey()).isGreaterThan(original.getKey());

final var originalDecision = original.getValue().getDecisionsMetadata();
final var originalDrg = original.getValue().getDecisionRequirementsMetadata();
final var repeatedDecision = repeated.getValue().getDecisionsMetadata();
final var repeatedDrg = repeated.getValue().getDecisionRequirementsMetadata();
assertThat(repeatedDecision.size()).isEqualTo(originalDecision.size()).isOne();
assertThat(repeatedDrg.size()).isEqualTo(originalDrg.size()).isOne();

assertThat(
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTE)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.count())
.isEqualTo(PARTITION_COUNT - 1);

final var repeatedDecisions =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDecisions.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDecisions.forEach(r -> assertDifferentDecision(originalDecision.get(0), r));

final var repeatedDrgs =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionRequirementsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDrgs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDrgs.forEach(r -> assertDifferentDrg(originalDrg.get(0), r));
}

private void assertSameProcess(
final ProcessMetadataValue original, final ProcessMetadataValue repeated) {
Assertions.assertThat(repeated)
.hasVersion(original.getVersion())
Expand All @@ -350,12 +450,53 @@ private void assertSameResource(
.hasBpmnProcessId(original.getBpmnProcessId());
}

private void assertDifferentResources(
private void assertDifferentProcesses(
final ProcessMetadataValue original, final ProcessMetadataValue repeated) {
assertThat(original.getProcessDefinitionKey()).isLessThan(repeated.getProcessDefinitionKey());
assertThat(original.getVersion()).isLessThan(repeated.getVersion());
}

private void assertSameDecision(
final DecisionRecordValue original, final DecisionRecordValue repeated) {
Assertions.assertThat(repeated)
.hasDecisionId(original.getDecisionId())
.hasDecisionName(original.getDecisionName())
.hasVersion(original.getVersion())
.hasDecisionKey(original.getDecisionKey())
.hasDecisionRequirementsId(original.getDecisionRequirementsId())
.hasDecisionRequirementsKey(original.getDecisionRequirementsKey());
}

private void assertDifferentDecision(
final DecisionRecordValue original, final DecisionRecordValue repeated) {
assertThat(original.getVersion()).isLessThan(repeated.getVersion());
assertThat(original.getDecisionKey()).isLessThan(repeated.getDecisionKey());
assertThat(original.getDecisionRequirementsKey())
.isLessThan(repeated.getDecisionRequirementsKey());
}

private void assertSameDrg(
final DecisionRequirementsMetadataValue original,
final DecisionRequirementsMetadataValue repeated) {
Assertions.assertThat(repeated)
.hasDecisionRequirementsId(original.getDecisionRequirementsId())
.hasDecisionRequirementsName(original.getDecisionRequirementsName())
.hasDecisionRequirementsVersion(original.getDecisionRequirementsVersion())
.hasDecisionRequirementsKey(original.getDecisionRequirementsKey())
.hasNamespace(original.getNamespace())
.hasResourceName(original.getResourceName())
.hasChecksum(original.getChecksum());
}

private void assertDifferentDrg(
final DecisionRequirementsMetadataValue original,
final DecisionRequirementsMetadataValue repeated) {
assertThat(original.getDecisionRequirementsVersion())
.isLessThan(repeated.getDecisionRequirementsVersion());
assertThat(original.getDecisionRequirementsKey())
.isLessThan(repeated.getDecisionRequirementsKey());
}

private byte[] bpmnXml(final BpmnModelInstance definition) {
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
Bpmn.writeModelToStream(outStream, definition);
Expand Down

0 comments on commit da40ce0

Please sign in to comment.