Skip to content

Commit

Permalink
merge: #9883
Browse files Browse the repository at this point in the history
9883: [stable/8.0] Allow retried DMN resource distribution r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->
This patches a critical bug related to the retry mechanism of deployment distribution. If the retried deployment distribution contains a DMN resource, then this would trigger the consistency checks.

This patch is only available for `stable/8.0`, as we want to provide a different solution on `main`. See #9877 (comment).

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #9877



Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
  • Loading branch information
3 people committed Jul 26, 2022
2 parents 964807e + 0e60752 commit f22dcfb
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 6 deletions.
Expand Up @@ -146,11 +146,11 @@ private Optional<PersistedDecision> findDecisionByKey(final long decisionKey) {
public void storeDecisionRecord(final DecisionRecord record) {
dbDecisionKey.wrapLong(record.getDecisionKey());
dbPersistedDecision.wrap(record);
decisionsByKey.insert(dbDecisionKey, dbPersistedDecision);
decisionsByKey.upsert(dbDecisionKey, dbPersistedDecision);

dbDecisionKey.wrapLong(record.getDecisionKey());
dbDecisionRequirementsKey.wrapLong(record.getDecisionRequirementsKey());
decisionKeyByDecisionRequirementsKey.insert(
decisionKeyByDecisionRequirementsKey.upsert(
dbDecisionRequirementsKeyAndDecisionKey, DbNil.INSTANCE);

updateLatestDecisionVersion(record);
Expand All @@ -160,7 +160,7 @@ public void storeDecisionRecord(final DecisionRecord record) {
public void storeDecisionRequirements(final DecisionRequirementsRecord record) {
dbDecisionRequirementsKey.wrapLong(record.getDecisionRequirementsKey());
dbPersistedDecisionRequirements.wrap(record);
decisionRequirementsByKey.insert(dbDecisionRequirementsKey, dbPersistedDecisionRequirements);
decisionRequirementsByKey.upsert(dbDecisionRequirementsKey, dbPersistedDecisionRequirements);

updateLatestDecisionRequirementsVersion(record);
}
Expand All @@ -185,7 +185,7 @@ private void updateDecisionAsLatestVersion(final DecisionRecord record) {
private void insertDecisionAsLatestVersion(final DecisionRecord record) {
dbDecisionId.wrapBuffer(record.getDecisionIdBuffer());
dbDecisionKey.wrapLong(record.getDecisionKey());
latestDecisionKeysByDecisionId.insert(dbDecisionId, fkDecision);
latestDecisionKeysByDecisionId.upsert(dbDecisionId, fkDecision);
}

private void updateLatestDecisionRequirementsVersion(final DecisionRequirementsRecord record) {
Expand All @@ -209,6 +209,6 @@ private void updateDecisionRequirementsAsLatestVersion(final DecisionRequirement
private void insertDecisionRequirementsAsLatestVersion(final DecisionRequirementsRecord record) {
dbDecisionRequirementsId.wrapBuffer(record.getDecisionRequirementsIdBuffer());
dbDecisionRequirementsKey.wrapLong(record.getDecisionRequirementsKey());
latestDecisionRequirementsKeysById.insert(dbDecisionRequirementsId, fkDecisionRequirements);
latestDecisionRequirementsKeysById.upsert(dbDecisionRequirementsId, fkDecisionRequirements);
}
}
Expand Up @@ -14,10 +14,14 @@
import io.camunda.zeebe.it.util.GrpcClientRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.time.Duration;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
Expand All @@ -29,7 +33,13 @@ public final class DeploymentClusteredTest {
Bpmn.createExecutableProcess("process").startEvent().endEvent().done();

public final Timeout testTimeout = Timeout.seconds(120);
public final ClusteringRule clusteringRule = new ClusteringRule(3, 1, 3);
public final ClusteringRule clusteringRule =
new ClusteringRule(
3,
1,
3,
brokerCfg ->
brokerCfg.getExperimental().getConsistencyChecks().setEnablePreconditions(true));
public final GrpcClientRule clientRule = new GrpcClientRule(clusteringRule);

@Rule
Expand Down Expand Up @@ -90,4 +100,62 @@ public void shouldDistributedDeploymentWhenStoppedBrokerIsRestarted() {
// then
clientRule.waitUntilDeploymentIsDone(processDefinitionKey);
}

/**
* Regression test against https://github.com/camunda/zeebe/issues/9877
*
* <p>We expect that the DISTRIBUTE command is written a second time, after restart of the leader
* of the deployment partition. Both DISTRIBUTE commands should be processed and resulting in
* DISTRIBUTED events. These should be idempotently applied, for example using `upsert`.
*/
@Test
public void shouldDistributeDmnResourceOnRetry() {
final var leaderForDeploymentPartition =
clusteringRule.getLeaderForPartition(Protocol.DEPLOYMENT_PARTITION).getNodeId();
final var leaderForPartitionTwo = clusteringRule.getLeaderForPartition(2).getNodeId();
final var adminServiceLeaderTwo =
clusteringRule.getBroker(leaderForPartitionTwo).getBrokerContext().getBrokerAdminService();

// given
adminServiceLeaderTwo.pauseStreamProcessing();

clientRule
.getClient()
.newDeployResourceCommand()
.addResourceFromClasspath("dmn/decision-table.dmn")
.send()
.join();

Awaitility.await("until deployment distribution is send once to partition 2")
.atMost(Duration.ofMinutes(1))
.pollInterval(Duration.ofMillis(200))
.until(
() ->
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTE)
.withPartitionId(2)
.withResourceName("dmn/decision-table.dmn")
.findAny()
.isPresent());

RecordingExporter.reset();

// when
clusteringRule.stopBroker(leaderForDeploymentPartition);
adminServiceLeaderTwo.resumeStreamProcessing();
clusteringRule.startBroker(leaderForDeploymentPartition);

// then
Awaitility.await("until partition 2 has processed distribute command twice")
.atMost(Duration.ofMinutes(1))
.pollInterval(Duration.ofMillis(200))
.untilAsserted(
() ->
assertThat(
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withPartitionId(2)
.withResourceName("dmn/decision-table.dmn")
.limit(2))
.describedAs("expect that deployment is distributed twice")
.hasSize(2));
}
}
48 changes: 48 additions & 0 deletions qa/integration-tests/src/test/resources/dmn/decision-table.dmn
@@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<definitions xmlns="https://www.omg.org/spec/DMN/20191111/MODEL/" xmlns:dmndi="https://www.omg.org/spec/DMN/20191111/DMNDI/" xmlns:dc="http://www.omg.org/spec/DMN/20180521/DC/" xmlns:biodi="http://bpmn.io/schema/dmn/biodi/2.0" id="force_users" name="Force Users" namespace="http://camunda.org/schema/1.0/dmn" exporter="Camunda Modeler" exporterVersion="4.11.0">
<decision id="jedi_or_sith" name="Jedi or Sith">
<decisionTable id="DecisionTable_14n3bxx">
<input id="Input_1" label="Lightsaber color" biodi:width="192">
<inputExpression id="InputExpression_1" typeRef="string">
<text>lightsaberColor</text>
</inputExpression>
</input>
<output id="Output_1" label="Jedi or Sith" name="jedi_or_sith" typeRef="string" biodi:width="192">
<outputValues id="UnaryTests_0hj346a">
<text>"Jedi","Sith"</text>
</outputValues>
</output>
<rule id="DecisionRule_0zumznl">
<inputEntry id="UnaryTests_0leuxqi">
<text>"blue"</text>
</inputEntry>
<outputEntry id="LiteralExpression_0c9vpz8">
<text>"Jedi"</text>
</outputEntry>
</rule>
<rule id="DecisionRule_1utwb1e">
<inputEntry id="UnaryTests_1v3sd4m">
<text>"green"</text>
</inputEntry>
<outputEntry id="LiteralExpression_0tgh8k1">
<text>"Jedi"</text>
</outputEntry>
</rule>
<rule id="DecisionRule_1bwgcym">
<inputEntry id="UnaryTests_0n1ewm3">
<text>"red"</text>
</inputEntry>
<outputEntry id="LiteralExpression_19xnlkw">
<text>"Sith"</text>
</outputEntry>
</rule>
</decisionTable>
</decision>
<dmndi:DMNDI>
<dmndi:DMNDiagram>
<dmndi:DMNShape dmnElementRef="jedi_or_sith">
<dc:Bounds height="80" width="180" x="160" y="100" />
</dmndi:DMNShape>
</dmndi:DMNDiagram>
</dmndi:DMNDI>
</definitions>
Expand Up @@ -42,4 +42,12 @@ public DeploymentRecordStream withDeployedProcesses(final List<Process> processe
public DeploymentRecordStream withDeployedProcess(final Process process) {
return valueFilter(v -> v.getProcessesMetadata().contains(process));
}

public DeploymentRecordStream withResourceName(final String resourceName) {
return valueFilter(
v ->
v.getResources().stream()
.map(DeploymentResource::getResourceName)
.anyMatch(resourceName::equals));
}
}

0 comments on commit f22dcfb

Please sign in to comment.