Skip to content

Commit

Permalink
merge: #9292
Browse files Browse the repository at this point in the history
9292: [Backport 8.0] Trigger boundary events only if the flow scope is active r=saig0 a=saig0

## Description

Backport of #9281

No additional changes compared to the origin PR.

## Related issues

relates to #9233


Co-authored-by: Philipp Ossler <philipp.ossler@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and saig0 committed May 5, 2022
2 parents 0e25fd0 + 06a2656 commit 9276995
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,11 @@ public void onChildTerminated(

private void transitionToTerminated(
final ExecutableCallActivity element, final BpmnElementContext context) {
final var flowScopeInstance = stateBehavior.getFlowScopeInstance(context);

eventSubscriptionBehavior
.findEventTrigger(context)
.filter(eventTrigger -> flowScopeInstance.isActive())
.ifPresentOrElse(
eventTrigger -> {
final var terminated = stateTransitionBehavior.transitionToTerminated(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,14 @@ private void activate(

private void terminate(
final ExecutableMultiInstanceBody element, final BpmnElementContext flowScopeContext) {

final var flowScopeInstance = stateBehavior.getFlowScopeInstance(flowScopeContext);

incidentBehavior.resolveIncidents(flowScopeContext);

eventSubscriptionBehavior
.findEventTrigger(flowScopeContext)
.filter(eventTrigger -> flowScopeInstance.isActive())
.ifPresentOrElse(
eventTrigger -> {
final var terminated =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void onChildTerminated(
final ExecutableFlowElementContainer element,
final BpmnElementContext subProcessContext,
final BpmnElementContext childContext) {
final var flowScopeInstance = stateBehavior.getFlowScopeInstance(subProcessContext);

if (stateBehavior.isInterrupted(subProcessContext)) {
// an interrupting event subprocess was triggered
Expand All @@ -124,6 +125,7 @@ public void onChildTerminated(
// if we are able to terminate we try to trigger boundary events
eventSubscriptionBehavior
.findEventTrigger(subProcessContext)
.filter(eventTrigger -> flowScopeInstance.isActive())
.ifPresentOrElse(
eventTrigger -> {
final var terminated =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnDecisionBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnEventSubscriptionBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnIncidentBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateTransitionBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnVariableMappingBehavior;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableBusinessRuleTask;
Expand Down Expand Up @@ -72,13 +73,15 @@ private static final class CalledDecisionBehavior implements BusinessRuleTaskBeh
private final BpmnIncidentBehavior incidentBehavior;
private final BpmnStateTransitionBehavior stateTransitionBehavior;
private final BpmnVariableMappingBehavior variableMappingBehavior;
private final BpmnStateBehavior stateBehavior;

public CalledDecisionBehavior(final BpmnBehaviors bpmnBehaviors) {
decisionBehavior = bpmnBehaviors.decisionBehavior();
eventSubscriptionBehavior = bpmnBehaviors.eventSubscriptionBehavior();
incidentBehavior = bpmnBehaviors.incidentBehavior();
stateTransitionBehavior = bpmnBehaviors.stateTransitionBehavior();
variableMappingBehavior = bpmnBehaviors.variableMappingBehavior();
stateBehavior = bpmnBehaviors.stateBehavior();
}

@Override
Expand Down Expand Up @@ -109,11 +112,13 @@ public void onComplete(
@Override
public void onTerminate(
final ExecutableBusinessRuleTask element, final BpmnElementContext context) {
final var flowScopeInstance = stateBehavior.getFlowScopeInstance(context);

incidentBehavior.resolveIncidents(context);

eventSubscriptionBehavior
.findEventTrigger(context)
.filter(eventTrigger -> flowScopeInstance.isActive())
.ifPresentOrElse(
eventTrigger -> {
final var terminated = stateTransitionBehavior.transitionToTerminated(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnEventSubscriptionBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnIncidentBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnJobBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateTransitionBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnVariableMappingBehavior;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableJobWorkerTask;
Expand All @@ -28,13 +29,15 @@ public final class JobWorkerTaskProcessor implements BpmnElementProcessor<Execut
private final BpmnVariableMappingBehavior variableMappingBehavior;
private final BpmnEventSubscriptionBehavior eventSubscriptionBehavior;
private final BpmnJobBehavior jobBehavior;
private final BpmnStateBehavior stateBehavior;

public JobWorkerTaskProcessor(final BpmnBehaviors behaviors) {
eventSubscriptionBehavior = behaviors.eventSubscriptionBehavior();
incidentBehavior = behaviors.incidentBehavior();
stateTransitionBehavior = behaviors.stateTransitionBehavior();
variableMappingBehavior = behaviors.variableMappingBehavior();
jobBehavior = behaviors.jobBehavior();
stateBehavior = behaviors.stateBehavior();
}

@Override
Expand Down Expand Up @@ -69,13 +72,15 @@ public void onComplete(final ExecutableJobWorkerTask element, final BpmnElementC

@Override
public void onTerminate(final ExecutableJobWorkerTask element, final BpmnElementContext context) {
final var flowScopeInstance = stateBehavior.getFlowScopeInstance(context);

jobBehavior.cancelJob(context);
eventSubscriptionBehavior.unsubscribeFromEvents(context);
incidentBehavior.resolveIncidents(context);

eventSubscriptionBehavior
.findEventTrigger(context)
.filter(eventTrigger -> flowScopeInstance.isActive())
.ifPresentOrElse(
eventTrigger -> {
final var terminated = stateTransitionBehavior.transitionToTerminated(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnBehaviors;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnEventSubscriptionBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnIncidentBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnStateTransitionBehavior;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnVariableMappingBehavior;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableReceiveTask;
Expand All @@ -22,12 +23,14 @@ public final class ReceiveTaskProcessor implements BpmnElementProcessor<Executab
private final BpmnStateTransitionBehavior stateTransitionBehavior;
private final BpmnVariableMappingBehavior variableMappingBehavior;
private final BpmnEventSubscriptionBehavior eventSubscriptionBehavior;
private final BpmnStateBehavior stateBehavior;

public ReceiveTaskProcessor(final BpmnBehaviors behaviors) {
eventSubscriptionBehavior = behaviors.eventSubscriptionBehavior();
incidentBehavior = behaviors.incidentBehavior();
stateTransitionBehavior = behaviors.stateTransitionBehavior();
variableMappingBehavior = behaviors.variableMappingBehavior();
stateBehavior = behaviors.stateBehavior();
}

@Override
Expand Down Expand Up @@ -63,12 +66,14 @@ public void onComplete(final ExecutableReceiveTask element, final BpmnElementCon

@Override
public void onTerminate(final ExecutableReceiveTask element, final BpmnElementContext context) {
final var flowScopeInstance = stateBehavior.getFlowScopeInstance(context);

eventSubscriptionBehavior.unsubscribeFromEvents(context);
incidentBehavior.resolveIncidents(context);

eventSubscriptionBehavior
.findEventTrigger(context)
.filter(eventTrigger -> flowScopeInstance.isActive())
.ifPresentOrElse(
eventTrigger -> {
final var terminated = stateTransitionBehavior.transitionToTerminated(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,63 @@ public void shouldCreateInstanceOfCalledElementAtNoneStartEvent() {
.containsOnly("none-start");
}

@Test
public void shouldTriggerBoundaryEventOnChildInstanceTermination() {
// given two processes with call activities that have an interrupting message boundary event
final var processLevel1 =
Bpmn.createExecutableProcess("level1")
.startEvent()
.callActivity("call-level2", c -> c.zeebeProcessId("level2"))
.boundaryEvent()
.message(m -> m.name("cancel").zeebeCorrelationKeyExpression("key"))
.endEvent()
.done();

final var processLevel2 =
Bpmn.createExecutableProcess("level2")
.startEvent()
.callActivity("call-level3", c -> c.zeebeProcessId("level3"))
.boundaryEvent()
.message(m -> m.name("cancel").zeebeCorrelationKeyExpression("key"))
.endEvent()
.done();

final var processLevel3 =
Bpmn.createExecutableProcess("level3")
.startEvent()
.serviceTask("task-level3", t -> t.zeebeJobType("task-level3"))
.endEvent()
.done();

ENGINE
.deployment()
.withXmlResource("level1.bpmn", processLevel1)
.withXmlResource("level2.bpmn", processLevel2)
.withXmlResource("level3.bpmn", processLevel3)
.deploy();

final var processInstanceKey =
ENGINE.processInstance().ofBpmnProcessId("level1").withVariable("key", "key-1").create();

RecordingExporter.jobRecords(JobIntent.CREATED).withType("task-level3").await();

// when publish a message to trigger the boundary events and terminate the call activities
ENGINE.message().withName("cancel").withCorrelationKey("key-1").publish();

// then
assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted())
.extracting(r -> r.getValue().getBpmnElementType(), Record::getIntent)
.containsSubsequence(
tuple(BpmnElementType.CALL_ACTIVITY, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.CALL_ACTIVITY, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(BpmnElementType.BOUNDARY_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.BOUNDARY_EVENT, ProcessInstanceIntent.ELEMENT_COMPLETED),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED));
}

private void completeJobWith(final Map<String, Object> variables) {

RecordingExporter.jobRecords(JobIntent.CREATED).withType(jobType).getFirst().getValue();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.bpmn.boundary;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.engine.util.RecordToWrite;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.BpmnModelInstance;
import io.camunda.zeebe.model.bpmn.builder.AbstractActivityBuilder;
import io.camunda.zeebe.model.bpmn.builder.AbstractFlowNodeBuilder;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.util.Collection;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@RunWith(Parameterized.class)
public final class BoundaryEventElementTest {

@ClassRule public static final EngineRule ENGINE = EngineRule.singlePartition();

private static final String PROCESS_ID = "process";

@Rule
public final RecordingExporterTestWatcher recordingExporterTestWatcher =
new RecordingExporterTestWatcher();

@Parameter public ElementWithBoundaryEventBuilder elementBuilder;

@Parameters(name = "{0}")
public static Collection<Object[]> parameters() {
return buildersAsParameters();
}

private BpmnModelInstance process(final ElementWithBoundaryEventBuilder elementBuilder) {
final var startEventBuilder = Bpmn.createExecutableProcess(PROCESS_ID).startEvent();

final var processWithElementBuilder = elementBuilder.build(startEventBuilder);

return processWithElementBuilder.boundaryEvent().timerWithDuration("PT1H").endEvent().done();
}

@Test
public void shouldNotActivateBoundaryEventIfScopeIsTerminating() {
// given
ENGINE.deployment().withXmlResource(process(elementBuilder)).deploy();

final var processInstanceKey = ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).create();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementType(elementBuilder.elementType)
.await();

final var timerCreated =
RecordingExporter.timerRecords(TimerIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.getFirst();

// when trigger the boundary event and cancel the process instance concurrently
ENGINE.writeRecords(
RecordToWrite.command()
.processInstance(ProcessInstanceIntent.CANCEL, new ProcessInstanceRecord())
.key(processInstanceKey),
RecordToWrite.command()
.timer(TimerIntent.TRIGGER, timerCreated.getValue())
.key(timerCreated.getKey()));

// then
assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceTerminated())
.extracting(r -> r.getValue().getBpmnElementType(), Record::getIntent)
.containsSubsequence(
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_TERMINATING),
tuple(elementBuilder.elementType, ProcessInstanceIntent.ELEMENT_TERMINATING),
tuple(elementBuilder.elementType, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_TERMINATED))
.doesNotContain(
tuple(BpmnElementType.BOUNDARY_EVENT, ProcessInstanceIntent.ELEMENT_ACTIVATING));
}

private static Collection<Object[]> buildersAsParameters() {
return builders().map(builder -> new Object[] {builder}).collect(Collectors.toList());
}

private static Stream<ElementWithBoundaryEventBuilder> builders() {
return Stream.of(
// -------------------------------
new ElementWithBoundaryEventBuilder(
BpmnElementType.SERVICE_TASK,
process -> process.serviceTask("task", t -> t.zeebeJobType("task"))),
// -------------------------------
new ElementWithBoundaryEventBuilder(
BpmnElementType.RECEIVE_TASK,
process ->
process.receiveTask(
"task",
r -> r.message(m -> m.name("wait").zeebeCorrelationKeyExpression("123")))),
// -------------------------------
new ElementWithBoundaryEventBuilder(
BpmnElementType.BUSINESS_RULE_TASK,
process -> process.businessRuleTask("task", b -> b.zeebeJobType("task"))),
// -------------------------------
new ElementWithBoundaryEventBuilder(
BpmnElementType.USER_TASK, process -> process.userTask("task")),
// -------------------------------
new ElementWithBoundaryEventBuilder(
BpmnElementType.SCRIPT_TASK,
process -> process.scriptTask("task", s -> s.zeebeJobType("task"))),
// -------------------------------
new ElementWithBoundaryEventBuilder(
BpmnElementType.SEND_TASK,
process -> process.sendTask("task", s -> s.zeebeJobType("task"))),
// -------------------------------
new ElementWithBoundaryEventBuilder(
BpmnElementType.SUB_PROCESS,
process ->
process.subProcess(
"subprocess",
s ->
s.embeddedSubProcess()
.startEvent()
.serviceTask("task", t -> t.zeebeJobType("task"))
.endEvent())),
// -------------------------------
new ElementWithBoundaryEventBuilder(
BpmnElementType.MULTI_INSTANCE_BODY,
process ->
process
.serviceTask("task", t -> t.zeebeJobType("task"))
.multiInstance(m -> m.parallel().zeebeInputCollectionExpression("[1,2,3]"))),
// -------------------------------
new ElementWithBoundaryEventBuilder(
BpmnElementType.CALL_ACTIVITY,
process -> process.callActivity("call", c -> c.zeebeProcessId(PROCESS_ID))));
}

private record ElementWithBoundaryEventBuilder(
BpmnElementType elementType,
Function<AbstractFlowNodeBuilder<?, ?>, AbstractActivityBuilder<?, ?>> builder) {

public AbstractActivityBuilder<?, ?> build(final AbstractFlowNodeBuilder<?, ?> process) {
return builder.apply(process);
}

@Override
public String toString() {
return elementType.name();
}
}
}

0 comments on commit 9276995

Please sign in to comment.