Skip to content

Commit

Permalink
merge: #9125 #9133
Browse files Browse the repository at this point in the history
9125: [Backport stable/8.0] fix(broker): do not log transition failure due to term mismatch as error r=deepthidevaki a=github-actions[bot]

# Description
Backport of #9122 to `stable/8.0`.

relates to #9040

9133: [Backport stable/8.0] Prevent duplicate key insertion for DMN r=remcowesterhoud a=github-actions[bot]

# Description
Backport of #9121 to `stable/8.0`.

relates to #9115

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
3 people committed Apr 13, 2022
3 parents 4bf32f2 + 179d0d1 + 15ea241 commit 7eeeffc
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ private ActorFuture<Void> leaderTransition(final long newTerm) {
});
onRecoveredInternal();
} else {
LOG.error("Failed to install leader partition {}", context.getPartitionId(), error);
onInstallFailure(error);
}
});
Expand All @@ -274,7 +273,6 @@ private ActorFuture<Void> followerTransition(final long newTerm) {
});
onRecoveredInternal();
} else {
LOG.error("Failed to install follower partition {}", context.getPartitionId(), error);
onInstallFailure(error);
}
});
Expand Down Expand Up @@ -340,6 +338,7 @@ private void onInstallFailure(final Throwable error) {
context.getPartitionId(),
error.getMessage());
} else {
LOG.error("Failed to install partition {}", context.getPartitionId(), error);
handleRecoverableFailure();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ private void proceedWithTransition(final ActorFuture<Void> future) {

private void onStepCompletion(final ActorFuture<Void> future, final Throwable error) {
if (error != null) {
LOG.error(error.getMessage(), error);
future.completeExceptionally(error);

return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
package io.camunda.zeebe.engine.state.appliers;

import static io.camunda.zeebe.util.buffer.BufferUtil.wrapArray;
import static java.util.function.Predicate.not;

import io.camunda.zeebe.engine.state.TypedEventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableDecisionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessState;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRequirementsMetadataRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRequirementsRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
Expand Down Expand Up @@ -40,8 +43,8 @@ public void applyState(final long key, final DeploymentRecord value) {
}

private void putDmnResourcesInState(final DeploymentRecord value) {
value
.decisionRequirementsMetadata()
value.decisionRequirementsMetadata().stream()
.filter(not(DecisionRequirementsMetadataRecord::isDuplicate))
.forEach(
drg -> {
final var resource = getResourceByName(value, drg.getResourceName());
Expand All @@ -50,7 +53,9 @@ private void putDmnResourcesInState(final DeploymentRecord value) {
decisionState.storeDecisionRequirements(decisionRequirementsRecord);
});

value.decisionsMetadata().forEach(decisionState::storeDecisionRecord);
value.decisionsMetadata().stream()
.filter(not(DecisionRecord::isDuplicate))
.forEach(decisionState::storeDecisionRecord);
}

private DirectBuffer getResourceByName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.value.DeploymentDistributionRecordValue;
import io.camunda.zeebe.protocol.record.value.DeploymentRecordValue;
import io.camunda.zeebe.protocol.record.value.deployment.DecisionRecordValue;
import io.camunda.zeebe.protocol.record.value.deployment.DecisionRequirementsMetadataValue;
import io.camunda.zeebe.protocol.record.value.deployment.DeploymentResource;
import io.camunda.zeebe.protocol.record.value.deployment.ProcessMetadataValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
Expand All @@ -42,6 +44,8 @@ public final class CreateDeploymentMultiplePartitionsTest {
Bpmn.createExecutableProcess(PROCESS_ID).startEvent().endEvent().done();
private static final BpmnModelInstance PROCESS_2 =
Bpmn.createExecutableProcess("process2").startEvent().endEvent().done();
private static final String DMN_DECISION_TABLE = "/dmn/decision-table.dmn";
private static final String DMN_DECISION_TABLE_V2 = "/dmn/decision-table_v2.dmn";

@Rule
public final RecordingExporterTestWatcher recordingExporterTestWatcher =
Expand Down Expand Up @@ -300,7 +304,7 @@ public void shouldFilterDuplicateProcess() {
.collect(Collectors.toList());

assertThat(repeatedWfs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedWfs.forEach(repeatedWf -> assertSameResource(originalProcesses.get(0), repeatedWf));
repeatedWfs.forEach(repeatedWf -> assertSameProcess(originalProcesses.get(0), repeatedWf));
}

@Test
Expand All @@ -320,7 +324,7 @@ public void shouldNotFilterDifferentProcesses() {
final var repeatedProcesses = repeated.getValue().getProcessesMetadata();
assertThat(repeatedProcesses.size()).isEqualTo(originalProcesses.size()).isOne();

assertDifferentResources(originalProcesses.get(0), repeatedProcesses.get(0));
assertDifferentProcesses(originalProcesses.get(0), repeatedProcesses.get(0));

assertThat(
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTE)
Expand All @@ -338,10 +342,106 @@ public void shouldNotFilterDifferentProcesses() {

assertThat(repeatedWfs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedWfs.forEach(
repeatedWf -> assertDifferentResources(originalProcesses.get(0), repeatedWf));
repeatedWf -> assertDifferentProcesses(originalProcesses.get(0), repeatedWf));
}

private void assertSameResource(
@Test
public void shouldFilterDuplicateDmnResource() {
// given
final Record<DeploymentRecordValue> original =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE).deploy();

// when
final Record<DeploymentRecordValue> repeated =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE).deploy();

// then
assertThat(repeated.getKey()).isGreaterThan(original.getKey());

final var originalDecision = original.getValue().getDecisionsMetadata();
final var originalDrg = original.getValue().getDecisionRequirementsMetadata();
final var repeatedDecision = repeated.getValue().getDecisionsMetadata();
final var repeatedDrg = repeated.getValue().getDecisionRequirementsMetadata();
assertThat(repeatedDecision.size()).isEqualTo(originalDecision.size()).isOne();
assertThat(repeatedDrg.size()).isEqualTo(originalDrg.size()).isOne();

assertThat(
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTE)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.count())
.isEqualTo(PARTITION_COUNT - 1);

final var repeatedDecisions =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDecisions.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDecisions.forEach(r -> assertSameDecision(originalDecision.get(0), r));

final var repeatedDrgs =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionRequirementsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDrgs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDrgs.forEach(r -> assertSameDrg(originalDrg.get(0), r));
}

@Test
public void shouldNotFilterDifferentDmnResource() {
// given
final Record<DeploymentRecordValue> original =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE).deploy();

// when
final Record<DeploymentRecordValue> repeated =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE_V2).deploy();

// then
assertThat(repeated.getKey()).isGreaterThan(original.getKey());

final var originalDecision = original.getValue().getDecisionsMetadata();
final var originalDrg = original.getValue().getDecisionRequirementsMetadata();
final var repeatedDecision = repeated.getValue().getDecisionsMetadata();
final var repeatedDrg = repeated.getValue().getDecisionRequirementsMetadata();
assertThat(repeatedDecision.size()).isEqualTo(originalDecision.size()).isOne();
assertThat(repeatedDrg.size()).isEqualTo(originalDrg.size()).isOne();

assertThat(
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTE)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.count())
.isEqualTo(PARTITION_COUNT - 1);

final var repeatedDecisions =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDecisions.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDecisions.forEach(r -> assertDifferentDecision(originalDecision.get(0), r));

final var repeatedDrgs =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionRequirementsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDrgs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDrgs.forEach(r -> assertDifferentDrg(originalDrg.get(0), r));
}

private void assertSameProcess(
final ProcessMetadataValue original, final ProcessMetadataValue repeated) {
Assertions.assertThat(repeated)
.hasVersion(original.getVersion())
Expand All @@ -350,12 +450,53 @@ private void assertSameResource(
.hasBpmnProcessId(original.getBpmnProcessId());
}

private void assertDifferentResources(
private void assertDifferentProcesses(
final ProcessMetadataValue original, final ProcessMetadataValue repeated) {
assertThat(original.getProcessDefinitionKey()).isLessThan(repeated.getProcessDefinitionKey());
assertThat(original.getVersion()).isLessThan(repeated.getVersion());
}

private void assertSameDecision(
final DecisionRecordValue original, final DecisionRecordValue repeated) {
Assertions.assertThat(repeated)
.hasDecisionId(original.getDecisionId())
.hasDecisionName(original.getDecisionName())
.hasVersion(original.getVersion())
.hasDecisionKey(original.getDecisionKey())
.hasDecisionRequirementsId(original.getDecisionRequirementsId())
.hasDecisionRequirementsKey(original.getDecisionRequirementsKey());
}

private void assertDifferentDecision(
final DecisionRecordValue original, final DecisionRecordValue repeated) {
assertThat(original.getVersion()).isLessThan(repeated.getVersion());
assertThat(original.getDecisionKey()).isLessThan(repeated.getDecisionKey());
assertThat(original.getDecisionRequirementsKey())
.isLessThan(repeated.getDecisionRequirementsKey());
}

private void assertSameDrg(
final DecisionRequirementsMetadataValue original,
final DecisionRequirementsMetadataValue repeated) {
Assertions.assertThat(repeated)
.hasDecisionRequirementsId(original.getDecisionRequirementsId())
.hasDecisionRequirementsName(original.getDecisionRequirementsName())
.hasDecisionRequirementsVersion(original.getDecisionRequirementsVersion())
.hasDecisionRequirementsKey(original.getDecisionRequirementsKey())
.hasNamespace(original.getNamespace())
.hasResourceName(original.getResourceName())
.hasChecksum(original.getChecksum());
}

private void assertDifferentDrg(
final DecisionRequirementsMetadataValue original,
final DecisionRequirementsMetadataValue repeated) {
assertThat(original.getDecisionRequirementsVersion())
.isLessThan(repeated.getDecisionRequirementsVersion());
assertThat(original.getDecisionRequirementsKey())
.isLessThan(repeated.getDecisionRequirementsKey());
}

private byte[] bpmnXml(final BpmnModelInstance definition) {
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
Bpmn.writeModelToStream(outStream, definition);
Expand Down

0 comments on commit 7eeeffc

Please sign in to comment.