Skip to content

Commit

Permalink
merge: #9277
Browse files Browse the repository at this point in the history
9277: [Backport stable/1.3] Interrupting event subprocess is activated more than once r=saig0 a=github-actions[bot]

# Description
Backport of #9261 to `stable/1.3`.

relates to #9185

Co-authored-by: Philipp Ossler <philipp.ossler@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and saig0 committed May 3, 2022
2 parents d7de841 + f06bc1a commit 08e1e62
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ public void onChildTerminated(
final BpmnElementContext flowScopeContext,
final BpmnElementContext childContext) {

if (flowScopeContext.getIntent() != ProcessInstanceIntent.ELEMENT_TERMINATING
&& stateBehavior.isInterrupted(flowScopeContext)) {
if (stateBehavior.isInterrupted(flowScopeContext)) {
// an interrupting event subprocess was triggered
eventSubscriptionBehavior
.findEventTrigger(flowScopeContext)
.ifPresent(
Expand All @@ -144,6 +144,7 @@ public void onChildTerminated(
flowScopeContext.getElementInstanceKey(),
eventTrigger,
flowScopeContext));

} else if (stateBehavior.canBeTerminated(childContext)) {
transitionTo(
element,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnVariableMappingBehavior;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElementContainer;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableStartEvent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;

public final class SubProcessProcessor
implements BpmnElementContainerProcessor<ExecutableFlowElementContainer> {
Expand Down Expand Up @@ -108,33 +107,9 @@ public void onChildTerminated(
final ExecutableFlowElementContainer element,
final BpmnElementContext subProcessContext,
final BpmnElementContext childContext) {
if (subProcessContext.getIntent() == ProcessInstanceIntent.ELEMENT_TERMINATING) {

if (childContext == null || stateBehavior.canBeTerminated(childContext)) {
// if we are able to terminate we try to trigger boundary events
eventSubscriptionBehavior
.findEventTrigger(subProcessContext)
.ifPresentOrElse(
eventTrigger -> {
final var terminated =
stateTransitionBehavior.transitionToTerminated(subProcessContext);
eventSubscriptionBehavior.activateTriggeredEvent(
subProcessContext.getElementInstanceKey(),
subProcessContext.getFlowScopeKey(),
eventTrigger,
terminated);
},
() -> {
final var terminated =
stateTransitionBehavior.transitionToTerminated(subProcessContext);
stateTransitionBehavior.onElementTerminated(element, terminated);
});
}

} else {
// if the flow scope is not terminating we allow
// * interrupting event sub processes
// * non interrupting boundary events

if (stateBehavior.isInterrupted(subProcessContext)) {
// an interrupting event subprocess was triggered
eventSubscriptionBehavior
.findEventTrigger(subProcessContext)
.ifPresent(
Expand All @@ -144,6 +119,26 @@ public void onChildTerminated(
subProcessContext.getElementInstanceKey(),
eventTrigger,
subProcessContext));

} else if (childContext == null || stateBehavior.canBeTerminated(childContext)) {
// if we are able to terminate we try to trigger boundary events
eventSubscriptionBehavior
.findEventTrigger(subProcessContext)
.ifPresentOrElse(
eventTrigger -> {
final var terminated =
stateTransitionBehavior.transitionToTerminated(subProcessContext);
eventSubscriptionBehavior.activateTriggeredEvent(
subProcessContext.getElementInstanceKey(),
subProcessContext.getFlowScopeKey(),
eventTrigger,
terminated);
},
() -> {
final var terminated =
stateTransitionBehavior.transitionToTerminated(subProcessContext);
stateTransitionBehavior.onElementTerminated(element, terminated);
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,152 @@ public void shouldTriggerInterruptingEventSubprocessAndNonInterruptingBoundaryEv
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED));
}

@Test
public void shouldTriggerEmbeddedInterruptingEventSubprocessOnlyOnce() {
// given
final Consumer<EventSubProcessBuilder> eventSubprocess =
eventSubProcess ->
builder
.apply(eventSubProcess.startEvent().interrupting(true))
.serviceTask("event_sub_task", t -> t.zeebeJobType("event_sub_task"));

final Consumer<SubProcessBuilder> embeddedSubprocess =
subProcess ->
subProcess
.embeddedSubProcess()
.eventSubProcess("event_sub_proc", eventSubprocess)
.startEvent()
.parallelGateway("fork")
.serviceTask("sub_task1", t -> t.zeebeJobType(JOB_TYPE))
.endEvent()
.moveToNode("fork")
.serviceTask("sub_task2", t -> t.zeebeJobType(JOB_TYPE))
.endEvent();

final BpmnModelInstance process =
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent()
.subProcess("sub_proc", embeddedSubprocess)
.endEvent()
.done();

ENGINE.deployment().withXmlResource(process).deploy();

final long processInstanceKey =
ENGINE
.processInstance()
.ofBpmnProcessId(PROCESS_ID)
.withVariable("key", MESSAGE_CORRELATION_KEY)
.create();

assertThat(
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.withType(JOB_TYPE)
.limit(2)
.count())
.describedAs("Await until both tasks are activated")
.isEqualTo(2);

// when
triggerEventSubprocess.accept(processInstanceKey);

ENGINE.job().ofInstance(processInstanceKey).withType("event_sub_task").complete();

// then
assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted()
.withElementType(BpmnElementType.EVENT_SUB_PROCESS)
.withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATED))
.describedAs("Expected to activate the event subprocess only once")
.hasSize(1);

assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted())
.extracting(r -> r.getValue().getBpmnElementType(), Record::getIntent)
.containsSubsequence(
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED));
}

@Test
public void shouldTriggerRootInterruptingEventSubprocessOnlyOnce() {
// given
final Consumer<EventSubProcessBuilder> eventSubprocess =
eventSubProcess ->
builder
.apply(eventSubProcess.startEvent().interrupting(true))
.serviceTask("event_sub_task", t -> t.zeebeJobType("event_sub_task"));

final BpmnModelInstance process =
Bpmn.createExecutableProcess(PROCESS_ID)
.eventSubProcess("event_sub_proc", eventSubprocess)
.startEvent()
.parallelGateway("fork")
.serviceTask("sub_task1", t -> t.zeebeJobType(JOB_TYPE))
.endEvent()
.moveToNode("fork")
.serviceTask("sub_task2", t -> t.zeebeJobType(JOB_TYPE))
.endEvent()
.done();

ENGINE.deployment().withXmlResource(process).deploy();

final long processInstanceKey =
ENGINE
.processInstance()
.ofBpmnProcessId(PROCESS_ID)
.withVariable("key", MESSAGE_CORRELATION_KEY)
.create();

assertThat(
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.withType(JOB_TYPE)
.limit(2)
.count())
.describedAs("Await until both tasks are activated")
.isEqualTo(2);

// when
triggerEventSubprocess.accept(processInstanceKey);

ENGINE.job().ofInstance(processInstanceKey).withType("event_sub_task").complete();

// then
assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted()
.withElementType(BpmnElementType.EVENT_SUB_PROCESS)
.withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATED))
.describedAs("Expected to activate the event subprocess only once")
.hasSize(1);

assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted())
.extracting(r -> r.getValue().getBpmnElementType(), Record::getIntent)
.containsSubsequence(
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(BpmnElementType.SERVICE_TASK, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(BpmnElementType.EVENT_SUB_PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED),
tuple(BpmnElementType.PROCESS, ProcessInstanceIntent.ELEMENT_COMPLETED));
}

private static void assertEventSubprocessLifecycle(final long processInstanceKey) {
final List<Record<ProcessInstanceRecordValue>> events =
RecordingExporter.processInstanceRecords()
Expand Down

0 comments on commit 08e1e62

Please sign in to comment.