Skip to content

Commit

Permalink
merge: #10074
Browse files Browse the repository at this point in the history
10074: Reject duplicate DeploymentDistribution Complete command r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->
There existed a special case that could lead to a ZeebeDbInconsistentException:
- a pending deployment is distributed multiple times to another partition by the [DeploymentRedistributor](https://github.com/camunda/zeebe/blob/main/engine/src/main/java/io/camunda/zeebe/engine/processing/deployment/distribute/DeploymentRedistributor.java).
- the other partition processes the distribution twice and both times sends a `DeploymentDistribution:Complete` command to the deployment partition (i.e. `partitionId: 1`).
- the deployment partition processes the first complete command, and writes `DeploymentDistribution:Completed` event, which is applied to the state.
- applying the completed event results in deleting the Pending Deployment for that partition.
- when it processes the second complete command, there could still be a pending deployment for another partition open for the same deployment, if so, the error happens.
- the second command is not rejected, because there is still a pending deployment for the deployment key, so another completed event is written and applied.
- applying fails this time, because the pending deployment no longer exists.

This PR changes the behavior. It makes sure the second command is rejected because the specific pending deployment no longer exists. In that case, we don't write the completed event a second time.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10064



Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and korthout committed Aug 18, 2022
2 parents e5262e2 + 6db151e commit 8ff06e4
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public CompleteDeploymentDistributionProcessor(
public void processRecord(final TypedRecord<DeploymentDistributionRecord> record) {

final var deploymentKey = record.getKey();
if (!deploymentState.hasPendingDeploymentDistribution(deploymentKey)) {
final var partitionId = record.getValue().getPartitionId();
if (!deploymentState.hasPendingDeploymentDistribution(deploymentKey, partitionId)) {
rejectionWriter.appendRejection(
record,
RejectionType.NOT_FOUND,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ public boolean hasPendingDeploymentDistribution(final long deploymentKey) {
return hasPending.get();
}

@Override
public boolean hasPendingDeploymentDistribution(final long deploymentKey, final int partitionId) {
this.deploymentKey.wrapLong(deploymentKey);
partitionKey.wrapInt(partitionId);
return pendingDeploymentColumnFamily.exists(deploymentPartitionKey);
}

@Override
public DeploymentRecord getStoredDeploymentRecord(final long key) {
deploymentKey.wrapLong(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,24 @@

public interface DeploymentState {

/**
* Returns whether there are any deployment distributions pending for a deployment.
*
* @param deploymentKey the key of the deployment that may have a pending distribution
* @return {@code true} if a pending deployment for the deployment key exists, otherwise {@code
* false}.
*/
boolean hasPendingDeploymentDistribution(long deploymentKey);

/**
* Returns whether a specific deployment distribution for a specific partition is pending.
*
* @param deploymentKey the key of the deployment that may have a pending distribution
* @param partitionId the id of the partition to which the distribution might be pending
* @return {@code true} if the specific pending deployment exists, otherwise {@code false}.
*/
boolean hasPendingDeploymentDistribution(long deploymentKey, int partitionId);

DeploymentRecord getStoredDeploymentRecord(long deploymentKey);

void foreachPendingDeploymentDistribution(PendingDeploymentVisitor pendingDeploymentVisitor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentRedistributor;
import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessIntent;
import io.camunda.zeebe.protocol.record.value.DeploymentDistributionRecordValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.util.stream.Collectors;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -122,4 +125,58 @@ public void shouldDistributeDmnResources() {
.describedAs("Expect that decisions are distributed")
.isNotEmpty();
}

@Test
public void shouldRejectCompleteDeploymentDistributionWhenAlreadyCompleted() {
// given
engine.pauseProcessing(2);
engine.pauseProcessing(3);

engine
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess("shouldReDistributeAfterRecovery")
.startEvent()
.endEvent()
.done())
.expectCreated()
.deploy();

RecordingExporter.records()
.withPartitionId(2)
.withValueType(ValueType.DEPLOYMENT)
.withIntent(DeploymentIntent.DISTRIBUTE)
.await();

// first one is skipped
engine.getClock().addTime(DeploymentRedistributor.DEPLOYMENT_REDISTRIBUTION_INTERVAL);
Awaitility.await()
.untilAsserted(
() -> {
// continue to add time to the clock until the deployment is re-distributed
engine.getClock().addTime(DeploymentRedistributor.DEPLOYMENT_REDISTRIBUTION_INTERVAL);
// todo: could benefit from RecordingExporter without
assertThat(
RecordingExporter.records()
.withPartitionId(2)
.withValueType(ValueType.DEPLOYMENT)
.withIntent(DeploymentIntent.DISTRIBUTE)
.limit(2))
.hasSize(2);
});

// when
engine.resumeProcessing(2);

// then
assertThat(
RecordingExporter.deploymentDistributionRecords()
.withIntent(DeploymentDistributionIntent.COMPLETE)
.withPartitionId(2)
.limit(3))
.extracting(Record::getRecordType)
.describedAs("Expect second command to be rejected")
.containsExactlyInAnyOrder(
RecordType.COMMAND, RecordType.COMMAND, RecordType.COMMAND_REJECTION);
}
}

0 comments on commit 8ff06e4

Please sign in to comment.