Skip to content

Commit

Permalink
merge: #9759
Browse files Browse the repository at this point in the history
9759: Reject duplicate parallel gateway activate command r=remcowesterhoud a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
Parallel gateways get activated by taking a sequence flow and checking if the number of taken flows is greater or equal to the number of incoming sequence flows. If this is the case an activate command is sent. The number of taken sequence flows get rest upon activation of the parallel gateway.

This proves troublesome when a "bad" model causes one of the incoming sequence flows to be taken twice. This could result in the activation command being sent twice. Imagine there is a parallel gateway with 2 incoming flows. What would happen is:

1. First flow is taken
2. Second flow is taken. Incoming flows == taken flows so an activate command is sent.
3. Second flow is taken again. The first activate command has not been processed yet. The number of taken flows has not been reset. As a result incoming flows < taken flows. A second activate command is sent.

This is solved by always sending an activate command when a sequence flow is taken. Once the `BpmnStreamProcessor` tries to process the record it will check if the state is valid. Here a check has been added to verify that when we receive an activate command for a parallel gateway we will first check if all the incoming flows have been taken. If this is not the case we will reject the command.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #6778 



Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Jul 15, 2022
2 parents 275a7d2 + aa9fab1 commit 2c5304e
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void processRecord(
final ExecutableFlowElement element = getElement(recordValue, processor);

stateTransitionGuard
.isValidStateTransition(context)
.isValidStateTransition(context, element)
.ifRightOrLeft(
ok -> {
LOGGER.trace("Process process instance event [context: {}]", context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior;
import io.camunda.zeebe.engine.processing.common.Failure;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowNode;
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
Expand Down Expand Up @@ -37,37 +39,34 @@ public ProcessInstanceStateTransitionGuard(final BpmnStateBehavior stateBehavior
*
* @return {@code true} if the preconditions are met and the transition command is valid.
*/
public Either<Failure, ?> isValidStateTransition(final BpmnElementContext context) {
return checkStateTransition(context).mapLeft(Failure::new);
public Either<Failure, ?> isValidStateTransition(
final BpmnElementContext context, final ExecutableFlowElement element) {
return checkStateTransition(context, element).mapLeft(Failure::new);
}

private Either<String, ?> checkStateTransition(final BpmnElementContext context) {
switch (context.getIntent()) {
case ACTIVATE_ELEMENT:
return hasActiveFlowScopeInstance(context);

case COMPLETE_ELEMENT:
// an incident is resolved by writing a COMPLETE command when the element instance is in
// state COMPLETING
return hasElementInstanceWithState(
context,
ProcessInstanceIntent.ELEMENT_ACTIVATED,
ProcessInstanceIntent.ELEMENT_COMPLETING)
.flatMap(ok -> hasActiveFlowScopeInstance(context));

case TERMINATE_ELEMENT:
return hasElementInstanceWithState(
context,
ProcessInstanceIntent.ELEMENT_ACTIVATING,
ProcessInstanceIntent.ELEMENT_ACTIVATED,
ProcessInstanceIntent.ELEMENT_COMPLETING);

default:
return Either.left(
String.format(
"Expected the check of the preconditions of a command with intent [activate,complete,terminate] but the intent was '%s'",
context.getIntent()));
}
private Either<String, ?> checkStateTransition(
final BpmnElementContext context, final ExecutableFlowElement element) {
return switch (context.getIntent()) {
case ACTIVATE_ELEMENT -> hasActiveFlowScopeInstance(context)
.flatMap(ok -> canActivateParallelGateway(context, element));
case COMPLETE_ELEMENT ->
// an incident is resolved by writing a COMPLETE command when the element instance is in
// state COMPLETING
hasElementInstanceWithState(
context,
ProcessInstanceIntent.ELEMENT_ACTIVATED,
ProcessInstanceIntent.ELEMENT_COMPLETING)
.flatMap(ok -> hasActiveFlowScopeInstance(context));
case TERMINATE_ELEMENT -> hasElementInstanceWithState(
context,
ProcessInstanceIntent.ELEMENT_ACTIVATING,
ProcessInstanceIntent.ELEMENT_ACTIVATED,
ProcessInstanceIntent.ELEMENT_COMPLETING);
default -> Either.left(
String.format(
"Expected the check of the preconditions of a command with intent [activate,complete,terminate] but the intent was '%s'",
context.getIntent()));
};
}

private Either<String, ElementInstance> getElementInstance(final BpmnElementContext context) {
Expand Down Expand Up @@ -166,4 +165,23 @@ private Either<String, ElementInstance> hasNonInterruptedFlowScope(
.flatMap(flowScopeInstance -> hasNonInterruptedFlowScope(flowScopeInstance, context));
}
}

private Either<String, ?> canActivateParallelGateway(
final BpmnElementContext context, final ExecutableFlowElement executableFlowElement) {
if (context.getBpmnElementType() != BpmnElementType.PARALLEL_GATEWAY) {
return Either.right(null);
} else {
final var element = (ExecutableFlowNode) executableFlowElement;
final int numberOfIncomingSequenceFlows = element.getIncoming().size();
final int numberOfTakenSequenceFlows =
stateBehavior.getNumberOfTakenSequenceFlows(context.getFlowScopeKey(), element.getId());
return numberOfTakenSequenceFlows >= numberOfIncomingSequenceFlows
? Either.right(null)
: Either.left(
String.format(
"Expected to be able to activate parallel gateway '%s',"
+ " but not all sequence flows have been taken.",
BufferUtil.bufferAsString(element.getId())));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,9 @@ public boolean isInterrupted(final BpmnElementContext flowScopeContext) {
&& flowScopeInstance.isInterrupted()
&& flowScopeInstance.isActive();
}

public int getNumberOfTakenSequenceFlows(
final long flowScopeKey, final DirectBuffer gatewayElementId) {
return elementInstanceState.getNumberOfTakenSequenceFlows(flowScopeKey, gatewayElementId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -250,26 +250,7 @@ public void takeSequenceFlow(
context.copy(
sequenceFlowKey, followUpInstanceRecord, ProcessInstanceIntent.SEQUENCE_FLOW_TAKEN);

if (canActivateTargetElement(context, target)) {
activateElementInstanceInFlowScope(sequenceFlowTaken, target);
}
}

private boolean canActivateTargetElement(
final BpmnElementContext context, final ExecutableFlowNode targetElement) {

final int numberOfIncomingSequenceFlows = targetElement.getIncoming().size();

if (targetElement.getElementType() == BpmnElementType.PARALLEL_GATEWAY) {
// activate the parallel gateway only if all incoming sequence flows are taken at least once
final int numberOfTakenSequenceFlows =
elementInstanceState.getNumberOfTakenSequenceFlows(
context.getFlowScopeKey(), targetElement.getId());
return numberOfTakenSequenceFlows == numberOfIncomingSequenceFlows;

} else {
return true;
}
activateElementInstanceInFlowScope(sequenceFlowTaken, target);
}

public void completeElement(final BpmnElementContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ public AbstractFlowElement getElementById(final DirectBuffer id) {
return flowElements.get(id);
}

public AbstractFlowElement getElementById(final String id) {
return flowElements.get(wrapString(id));
}

/** convenience function for transformation */
public <T extends ExecutableFlowElement> T getElementById(
final String id, final Class<T> expectedType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.camunda.zeebe.protocol.record.intent.MessageStartEventSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.MessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
Expand All @@ -48,6 +49,7 @@ public final class EventAppliers implements EventApplier {

public EventAppliers(final MutableZeebeState state) {
registerProcessInstanceEventAppliers(state);
registerProcessInstanceCreationAppliers(state);

register(ProcessIntent.CREATED, new ProcessCreatedApplier(state));
register(ErrorIntent.CREATED, new ErrorCreatedApplier(state.getBlackListState()));
Expand Down Expand Up @@ -136,6 +138,15 @@ private void registerProcessInstanceEventAppliers(final MutableZeebeState state)
new ProcessInstanceSequenceFlowTakenApplier(elementInstanceState, processState));
}

private void registerProcessInstanceCreationAppliers(final MutableZeebeState state) {
final var processState = state.getProcessState();
final var elementInstanceState = state.getElementInstanceState();

register(
ProcessInstanceCreationIntent.CREATED,
new ProcessInstanceCreationCreatedApplier(processState, elementInstanceState));
}

private void registerJobIntentEventAppliers(final MutableZeebeState state) {
register(JobIntent.CANCELED, new JobCanceledApplier(state));
register(JobIntent.COMPLETED, new JobCompletedApplier(state));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.appliers;

import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowNode;
import io.camunda.zeebe.engine.state.TypedEventApplier;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.engine.state.mutable.MutableElementInstanceState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessState;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceCreationRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import java.util.List;
import org.agrona.DirectBuffer;

final class ProcessInstanceCreationCreatedApplier
implements TypedEventApplier<ProcessInstanceCreationIntent, ProcessInstanceCreationRecord> {

private final ProcessState processState;
private final MutableElementInstanceState elementInstanceState;

public ProcessInstanceCreationCreatedApplier(
final MutableProcessState processState,
final MutableElementInstanceState elementInstanceState) {
this.processState = processState;
this.elementInstanceState = elementInstanceState;
}

@Override
public void applyState(final long key, final ProcessInstanceCreationRecord value) {
if (value.hasStartInstructions()) {
final var process =
processState.getProcessByKey(value.getProcessDefinitionKey()).getProcess();
final ElementInstance processInstance =
elementInstanceState.getInstance(value.getProcessInstanceKey());

value.getStartInstructions().stream()
.map(instruction -> process.getElementById(instruction.getElementId()))
.filter(element -> element.getElementType().equals(BpmnElementType.PARALLEL_GATEWAY))
.map(ExecutableFlowNode.class::cast)
.forEach(
element -> {
final var parentElementId = element.getFlowScope().getId();
final ElementInstance flowScope =
findParentFlowScope(processInstance, parentElementId);
incrementNumberOfTakenSequenceFlows(element, flowScope);
});
}
}

/**
* Traverses the element instances to find one that matches the parent element id. If this is the
* process instance, it is returned immediately. This will work because we will always activate
* the flow scope of a start instruction once.
*
* @param processInstance the highest element instance of a process
* @param targetElementId the id we are looking for
* @return the element instance which matches the targetElementId
*/
private ElementInstance findParentFlowScope(
final ElementInstance processInstance, final DirectBuffer targetElementId) {
if (processInstance.getValue().getElementIdBuffer().equals(targetElementId)) {
return processInstance;
}
return findFlowScopeInChildren(processInstance, targetElementId);
}

private ElementInstance findFlowScopeInChildren(
final ElementInstance processInstance, final DirectBuffer targetElementId) {
ElementInstance found = null;
final List<ElementInstance> children =
elementInstanceState.getChildren(processInstance.getKey());

for (final ElementInstance childInstance : children) {
if (childInstance.getValue().getElementIdBuffer().equals(targetElementId)) {
found = childInstance;
break;
} else {
found = findFlowScopeInChildren(childInstance, targetElementId);
if (found != null) {
break;
}
}
}

return found;
}

private void incrementNumberOfTakenSequenceFlows(
final ExecutableFlowNode element, final ElementInstance flowScope) {
element
.getIncoming()
.forEach(
incoming ->
elementInstanceState.incrementNumberOfTakenSequenceFlows(
flowScope.getKey(), element.getId(), incoming.getId()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.instance.ServiceTask;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import java.util.List;
import java.util.stream.Collectors;
import org.assertj.core.groups.Tuple;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -349,6 +352,59 @@ public void shouldSplitWithUncontrolledFlow() {
assertThat(taskEvents.get(0).getKey()).isNotEqualTo(taskEvents.get(1).getKey());
}

// Regression test for https://github.com/camunda/zeebe/issues/6778
@Test
public void shouldRejectActivateCommandWhenSequenceFlowIsTakenTwice() {
// given
final var process =
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent()
.parallelGateway("splitting")
.parallelGateway("joining")
.moveToNode("splitting")
.exclusiveGateway("exclusive")
.moveToNode("splitting")
.connectTo("exclusive")
.moveToNode("exclusive")
.connectTo("joining")
.moveToNode("joining")
.endEvent("endEvent")
.done();
engine.deployment().withXmlResource(process).deploy();

// when
final long key = engine.processInstance().ofBpmnProcessId(PROCESS_ID).create();

// then
final var records =
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(key)
.limit("endEvent", ProcessInstanceIntent.ELEMENT_COMPLETED)
.toList();

assertThat(
records.stream()
.filter(
r -> r.getValue().getBpmnElementType().equals(BpmnElementType.PARALLEL_GATEWAY))
.filter(r -> r.getIntent().equals(ProcessInstanceIntent.ACTIVATE_ELEMENT))
.filter(r -> r.getRecordType().equals(RecordType.COMMAND_REJECTION)))
.describedAs("activate command should be rejected twice")
.hasSize(2)
.extracting(Record::getRejectionType, Record::getRejectionReason)
.describedAs("rejection should contain correct rejection reason")
.containsOnly(
Tuple.tuple(
RejectionType.INVALID_STATE,
"Expected to be able to activate parallel gateway 'joining', but not all sequence flows have been taken."));

assertThat(
records.stream()
.filter(r -> r.getValue().getElementId().equals("joining"))
.filter(r -> r.getIntent().equals(ProcessInstanceIntent.ELEMENT_ACTIVATED)))
.describedAs("joining gateway should only be activated once")
.hasSize(1);
}

private static boolean isServiceTaskInProcess(
final String activityId, final BpmnModelInstance process) {
return process.getModelElementsByType(ServiceTask.class).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public ProcessInstanceRecordStream limitToProcessInstanceTerminated() {
&& r.getKey() == r.getValue().getProcessInstanceKey());
}

public ProcessInstanceRecordStream limit(
final String elementId, final ProcessInstanceIntent intent) {
return limit(
r -> r.getValue().getElementId().equals(elementId) && r.getIntent().equals(intent));
}

public ProcessInstanceRecordStream withElementType(final BpmnElementType elementType) {
return valueFilter(v -> v.getBpmnElementType() == elementType);
}
Expand Down

0 comments on commit 2c5304e

Please sign in to comment.