Skip to content

Commit

Permalink
merge: #9261
Browse files Browse the repository at this point in the history
9261: Interrupting event subprocess is activated more than once r=saig0 a=saig0

## Description

An interrupting event subprocess is activated more than once if
* the event subprocess is in an embedded subprocess
* more than one element instance is active in the same scope (i.e. the embedded subprocess)

The bug was fixed by aligning the child termination condition on the subprocess with the process. Now, it checks if the number of active child instances is zero. 

## Related issues

closes #9185



Co-authored-by: Philipp Ossler <philipp.ossler@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and saig0 committed May 3, 2022
2 parents d1c7540 + bee505b commit 987889d
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ public void onChildTerminated(
final BpmnElementContext flowScopeContext,
final BpmnElementContext childContext) {

if (flowScopeContext.getIntent() != ProcessInstanceIntent.ELEMENT_TERMINATING
&& stateBehavior.isInterrupted(flowScopeContext)) {
if (stateBehavior.isInterrupted(flowScopeContext)) {
// an interrupting event subprocess was triggered
eventSubscriptionBehavior
.findEventTrigger(flowScopeContext)
.ifPresent(
Expand All @@ -144,6 +144,7 @@ public void onChildTerminated(
flowScopeContext.getElementInstanceKey(),
eventTrigger,
flowScopeContext));

} else if (stateBehavior.canBeTerminated(childContext)) {
transitionTo(
element,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnVariableMappingBehavior;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElementContainer;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableStartEvent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;

public final class SubProcessProcessor
implements BpmnElementContainerProcessor<ExecutableFlowElementContainer> {
Expand Down Expand Up @@ -108,33 +107,9 @@ public void onChildTerminated(
final ExecutableFlowElementContainer element,
final BpmnElementContext subProcessContext,
final BpmnElementContext childContext) {
if (subProcessContext.getIntent() == ProcessInstanceIntent.ELEMENT_TERMINATING) {

if (childContext == null || stateBehavior.canBeTerminated(childContext)) {
// if we are able to terminate we try to trigger boundary events
eventSubscriptionBehavior
.findEventTrigger(subProcessContext)
.ifPresentOrElse(
eventTrigger -> {
final var terminated =
stateTransitionBehavior.transitionToTerminated(subProcessContext);
eventSubscriptionBehavior.activateTriggeredEvent(
subProcessContext.getElementInstanceKey(),
subProcessContext.getFlowScopeKey(),
eventTrigger,
terminated);
},
() -> {
final var terminated =
stateTransitionBehavior.transitionToTerminated(subProcessContext);
stateTransitionBehavior.onElementTerminated(element, terminated);
});
}

} else {
// if the flow scope is not terminating we allow
// * interrupting event sub processes
// * non interrupting boundary events

if (stateBehavior.isInterrupted(subProcessContext)) {
// an interrupting event subprocess was triggered
eventSubscriptionBehavior
.findEventTrigger(subProcessContext)
.ifPresent(
Expand All @@ -144,6 +119,26 @@ public void onChildTerminated(
subProcessContext.getElementInstanceKey(),
eventTrigger,
subProcessContext));

} else if (childContext == null || stateBehavior.canBeTerminated(childContext)) {
// if we are able to terminate we try to trigger boundary events
eventSubscriptionBehavior
.findEventTrigger(subProcessContext)
.ifPresentOrElse(
eventTrigger -> {
final var terminated =
stateTransitionBehavior.transitionToTerminated(subProcessContext);
eventSubscriptionBehavior.activateTriggeredEvent(
subProcessContext.getElementInstanceKey(),
subProcessContext.getFlowScopeKey(),
eventTrigger,
terminated);
},
() -> {
final var terminated =
stateTransitionBehavior.transitionToTerminated(subProcessContext);
stateTransitionBehavior.onElementTerminated(element, terminated);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,152 @@ public void shouldTriggerInterruptingEventSubprocessAndNonInterruptingBoundaryEv
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED));
}

@Test
public void shouldTriggerEmbeddedInterruptingEventSubprocessOnlyOnce() {
// given
final Consumer<EventSubProcessBuilder> eventSubprocess =
eventSubProcess ->
builder
.apply(eventSubProcess.startEvent().interrupting(true))
.serviceTask("event_sub_task", t -> t.zeebeJobType("event_sub_task"));

final Consumer<SubProcessBuilder> embeddedSubprocess =
subProcess ->
subProcess
.embeddedSubProcess()
.eventSubProcess("event_sub_proc", eventSubprocess)
.startEvent()
.parallelGateway("fork")
.serviceTask("sub_task1", t -> t.zeebeJobType(JOB_TYPE))
.endEvent()
.moveToNode("fork")
.serviceTask("sub_task2", t -> t.zeebeJobType(JOB_TYPE))
.endEvent();

final BpmnModelInstance process =
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent()
.subProcess("sub_proc", embeddedSubprocess)
.endEvent()
.done();

ENGINE.deployment().withXmlResource(process).deploy();

final long processInstanceKey =
ENGINE
.processInstance()
.ofBpmnProcessId(PROCESS_ID)
.withVariable("key", MESSAGE_CORRELATION_KEY)
.create();

assertThat(
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.withType(JOB_TYPE)
.limit(2)
.count())
.describedAs("Await until both tasks are activated")
.isEqualTo(2);

// when
triggerEventSubprocess.accept(processInstanceKey);

ENGINE.job().ofInstance(processInstanceKey).withType("event_sub_task").complete();

// then
assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted()
.withElementType(BpmnElementType.EVENT_SUB_PROCESS)
.withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATED))
.describedAs("Expected to activate the event subprocess only once")
.hasSize(1);

assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted())
.extracting(r -> r.getValue().getBpmnElementType(), Record::getIntent)
.containsSubsequence(
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED));
}

@Test
public void shouldTriggerRootInterruptingEventSubprocessOnlyOnce() {
// given
final Consumer<EventSubProcessBuilder> eventSubprocess =
eventSubProcess ->
builder
.apply(eventSubProcess.startEvent().interrupting(true))
.serviceTask("event_sub_task", t -> t.zeebeJobType("event_sub_task"));

final BpmnModelInstance process =
Bpmn.createExecutableProcess(PROCESS_ID)
.eventSubProcess("event_sub_proc", eventSubprocess)
.startEvent()
.parallelGateway("fork")
.serviceTask("sub_task1", t -> t.zeebeJobType(JOB_TYPE))
.endEvent()
.moveToNode("fork")
.serviceTask("sub_task2", t -> t.zeebeJobType(JOB_TYPE))
.endEvent()
.done();

ENGINE.deployment().withXmlResource(process).deploy();

final long processInstanceKey =
ENGINE
.processInstance()
.ofBpmnProcessId(PROCESS_ID)
.withVariable("key", MESSAGE_CORRELATION_KEY)
.create();

assertThat(
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.withType(JOB_TYPE)
.limit(2)
.count())
.describedAs("Await until both tasks are activated")
.isEqualTo(2);

// when
triggerEventSubprocess.accept(processInstanceKey);

ENGINE.job().ofInstance(processInstanceKey).withType("event_sub_task").complete();

// then
assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted()
.withElementType(BpmnElementType.EVENT_SUB_PROCESS)
.withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATED))
.describedAs("Expected to activate the event subprocess only once")
.hasSize(1);

assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted())
.extracting(r -> r.getValue().getBpmnElementType(), Record::getIntent)
.containsSubsequence(
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED));
}

private static void assertEventSubprocessLifecycle(final long processInstanceKey) {
final List<Record<ProcessInstanceRecordValue>> events =
RecordingExporter.processInstanceRecords()
Expand Down

0 comments on commit 987889d

Please sign in to comment.