Skip to content

Commit

Permalink
merge: #9823
Browse files Browse the repository at this point in the history
9823: [Backport stable/8.0] Reject duplicate parallel gateway activate command r=remcowesterhoud a=backport-action

# Description
Backport of #9759 to `stable/8.0`.

relates to #6778

Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Jul 20, 2022
2 parents d317691 + 80b82b7 commit 1042802
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public void processRecord(
final ExecutableFlowElement element = getElement(recordValue, processor);

stateTransitionGuard
.isValidStateTransition(context)
.isValidStateTransition(context, element)
.ifRightOrLeft(
ok -> {
LOGGER.trace("Process process instance event [context: {}]", context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior;
import io.camunda.zeebe.engine.processing.common.Failure;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowNode;
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
Expand Down Expand Up @@ -37,37 +39,34 @@ public ProcessInstanceStateTransitionGuard(final BpmnStateBehavior stateBehavior
*
* @return {@code true} if the preconditions are met and the transition command is valid.
*/
public Either<Failure, ?> isValidStateTransition(final BpmnElementContext context) {
return checkStateTransition(context).mapLeft(Failure::new);
public Either<Failure, ?> isValidStateTransition(
final BpmnElementContext context, final ExecutableFlowElement element) {
return checkStateTransition(context, element).mapLeft(Failure::new);
}

private Either<String, ?> checkStateTransition(final BpmnElementContext context) {
switch (context.getIntent()) {
case ACTIVATE_ELEMENT:
return hasActiveFlowScopeInstance(context);

case COMPLETE_ELEMENT:
// an incident is resolved by writing a COMPLETE command when the element instance is in
// state COMPLETING
return hasElementInstanceWithState(
context,
ProcessInstanceIntent.ELEMENT_ACTIVATED,
ProcessInstanceIntent.ELEMENT_COMPLETING)
.flatMap(ok -> hasActiveFlowScopeInstance(context));

case TERMINATE_ELEMENT:
return hasElementInstanceWithState(
context,
ProcessInstanceIntent.ELEMENT_ACTIVATING,
ProcessInstanceIntent.ELEMENT_ACTIVATED,
ProcessInstanceIntent.ELEMENT_COMPLETING);

default:
return Either.left(
String.format(
"Expected the check of the preconditions of a command with intent [activate,complete,terminate] but the intent was '%s'",
context.getIntent()));
}
private Either<String, ?> checkStateTransition(
final BpmnElementContext context, final ExecutableFlowElement element) {
return switch (context.getIntent()) {
case ACTIVATE_ELEMENT -> hasActiveFlowScopeInstance(context)
.flatMap(ok -> canActivateParallelGateway(context, element));
case COMPLETE_ELEMENT ->
// an incident is resolved by writing a COMPLETE command when the element instance is in
// state COMPLETING
hasElementInstanceWithState(
context,
ProcessInstanceIntent.ELEMENT_ACTIVATED,
ProcessInstanceIntent.ELEMENT_COMPLETING)
.flatMap(ok -> hasActiveFlowScopeInstance(context));
case TERMINATE_ELEMENT -> hasElementInstanceWithState(
context,
ProcessInstanceIntent.ELEMENT_ACTIVATING,
ProcessInstanceIntent.ELEMENT_ACTIVATED,
ProcessInstanceIntent.ELEMENT_COMPLETING);
default -> Either.left(
String.format(
"Expected the check of the preconditions of a command with intent [activate,complete,terminate] but the intent was '%s'",
context.getIntent()));
};
}

private Either<String, ElementInstance> getElementInstance(final BpmnElementContext context) {
Expand Down Expand Up @@ -166,4 +165,23 @@ private Either<String, ElementInstance> hasNonInterruptedFlowScope(
.flatMap(flowScopeInstance -> hasNonInterruptedFlowScope(flowScopeInstance, context));
}
}

private Either<String, ?> canActivateParallelGateway(
final BpmnElementContext context, final ExecutableFlowElement executableFlowElement) {
if (context.getBpmnElementType() != BpmnElementType.PARALLEL_GATEWAY) {
return Either.right(null);
} else {
final var element = (ExecutableFlowNode) executableFlowElement;
final int numberOfIncomingSequenceFlows = element.getIncoming().size();
final int numberOfTakenSequenceFlows =
stateBehavior.getNumberOfTakenSequenceFlows(context.getFlowScopeKey(), element.getId());
return numberOfTakenSequenceFlows >= numberOfIncomingSequenceFlows
? Either.right(null)
: Either.left(
String.format(
"Expected to be able to activate parallel gateway '%s',"
+ " but not all sequence flows have been taken.",
BufferUtil.bufferAsString(element.getId())));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,9 @@ public boolean isInterrupted(final BpmnElementContext flowScopeContext) {
&& flowScopeInstance.isInterrupted()
&& flowScopeInstance.isActive();
}

public int getNumberOfTakenSequenceFlows(
final long flowScopeKey, final DirectBuffer gatewayElementId) {
return elementInstanceState.getNumberOfTakenSequenceFlows(flowScopeKey, gatewayElementId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,26 +238,7 @@ public void takeSequenceFlow(
context.copy(
sequenceFlowKey, followUpInstanceRecord, ProcessInstanceIntent.SEQUENCE_FLOW_TAKEN);

if (canActivateTargetElement(context, target)) {
activateElementInstanceInFlowScope(sequenceFlowTaken, target);
}
}

private boolean canActivateTargetElement(
final BpmnElementContext context, final ExecutableFlowNode targetElement) {

final int numberOfIncomingSequenceFlows = targetElement.getIncoming().size();

if (targetElement.getElementType() == BpmnElementType.PARALLEL_GATEWAY) {
// activate the parallel gateway only if all incoming sequence flows are taken at least once
final int numberOfTakenSequenceFlows =
elementInstanceState.getNumberOfTakenSequenceFlows(
context.getFlowScopeKey(), targetElement.getId());
return numberOfTakenSequenceFlows == numberOfIncomingSequenceFlows;

} else {
return true;
}
activateElementInstanceInFlowScope(sequenceFlowTaken, target);
}

public void completeElement(final BpmnElementContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.instance.ServiceTask;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.groups.Tuple;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -349,6 +352,59 @@ public void shouldSplitWithUncontrolledFlow() {
assertThat(taskEvents.get(0).getKey()).isNotEqualTo(taskEvents.get(1).getKey());
}

// Regression test for https://github.com/camunda/zeebe/issues/6778
@Test
public void shouldRejectActivateCommandWhenSequenceFlowIsTakenTwice() {
// given
final var process =
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent()
.parallelGateway("splitting")
.parallelGateway("joining")
.moveToNode("splitting")
.exclusiveGateway("exclusive")
.moveToNode("splitting")
.connectTo("exclusive")
.moveToNode("exclusive")
.connectTo("joining")
.moveToNode("joining")
.endEvent("endEvent")
.done();
engine.deployment().withXmlResource(process).deploy();

// when
final long key = engine.processInstance().ofBpmnProcessId(PROCESS_ID).create();

// then
final var records =
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(key)
.limit("endEvent", ProcessInstanceIntent.ELEMENT_COMPLETED)
.toList();

assertThat(
records.stream()
.filter(
r -> r.getValue().getBpmnElementType().equals(BpmnElementType.PARALLEL_GATEWAY))
.filter(r -> r.getIntent().equals(ProcessInstanceIntent.ACTIVATE_ELEMENT))
.filter(r -> r.getRecordType().equals(RecordType.COMMAND_REJECTION)))
.describedAs("activate command should be rejected twice")
.hasSize(2)
.extracting(Record::getRejectionType, Record::getRejectionReason)
.describedAs("rejection should contain correct rejection reason")
.containsOnly(
Tuple.tuple(
RejectionType.INVALID_STATE,
"Expected to be able to activate parallel gateway 'joining', but not all sequence flows have been taken."));

assertThat(
records.stream()
.filter(r -> r.getValue().getElementId().equals("joining"))
.filter(r -> r.getIntent().equals(ProcessInstanceIntent.ELEMENT_ACTIVATED)))
.describedAs("joining gateway should only be activated once")
.hasSize(1);
}

private static boolean isServiceTaskInProcess(
final String activityId, final BpmnModelInstance process) {
return process.getModelElementsByType(ServiceTask.class).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public ProcessInstanceRecordStream limitToProcessInstanceTerminated() {
&& r.getKey() == r.getValue().getProcessInstanceKey());
}

public ProcessInstanceRecordStream limit(
final String elementId, final ProcessInstanceIntent intent) {
return limit(
r -> r.getValue().getElementId().equals(elementId) && r.getIntent().equals(intent));
}

public ProcessInstanceRecordStream withElementType(final BpmnElementType elementType) {
return valueFilter(v -> v.getBpmnElementType() == elementType);
}
Expand Down

0 comments on commit 1042802

Please sign in to comment.