Skip to content

Commit

Permalink
merge: #10609 #10618
Browse files Browse the repository at this point in the history
10609: Remove interrupted state on event subprocess activation r=remcowesterhoud a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
When an interrupting event sub process gets triggered it will terminate all active element in its flow scope and mark the flow scope as interrupted. With process instance modification it should be possible to re-activate element within the interrupted scope. Because of the interrupted state any activate commands get rejected, making this currently impossible.

With this change we will check if any of the activated elements is currently in an interrupted state. If this is the case we will remove this state, allowing elements within to be activated through a modification.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10477 



10618: feat(engine): Apply output mappings for none end events r=remcowesterhoud a=skayliu

## Description

<!-- Please explain the changes you made here. -->

Support none end event outputs.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10613 



Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
Co-authored-by: skayliu <skay463@163.com>
  • Loading branch information
3 people committed Oct 10, 2022
3 parents 56dd345 + f7cc7b2 + e0cf2d6 commit 61d8cea
Show file tree
Hide file tree
Showing 9 changed files with 375 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,13 @@ public boolean isSuitableForEvent(final ExecutableEndEvent element) {
public void onActivate(final ExecutableEndEvent element, final BpmnElementContext activating) {
final var activated = stateTransitionBehavior.transitionToActivated(activating);
final var completing = stateTransitionBehavior.transitionToCompleting(activated);
stateTransitionBehavior
.transitionToCompleted(element, completing)
.ifLeft(failure -> incidentBehavior.createIncident(failure, completing));

variableMappingBehavior
.applyOutputMappings(completing, element)
.flatMap(ok -> stateTransitionBehavior.transitionToCompleted(element, completing))
.ifRightOrLeft(
completed -> stateTransitionBehavior.takeOutgoingSequenceFlows(element, completed),
failure -> incidentBehavior.createIncident(failure, completing));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ public void processRecord(
processInstance,
process,
instruction));

activatedElementKeys
.getFlowScopeKeys()
.forEach(value::addActivatedElementInstanceKey);

return activatedElementKeys.getFlowScopeKeys().stream();
})
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
Expand All @@ -50,6 +51,7 @@ public final class EventAppliers implements EventApplier {
public EventAppliers(final MutableZeebeState state) {
registerProcessInstanceEventAppliers(state);
registerProcessInstanceCreationAppliers(state);
registerProcessInstanceModificationAppliers(state);

register(ProcessIntent.CREATED, new ProcessCreatedApplier(state));
register(ErrorIntent.CREATED, new ErrorCreatedApplier(state.getBlackListState()));
Expand Down Expand Up @@ -147,6 +149,12 @@ private void registerProcessInstanceCreationAppliers(final MutableZeebeState sta
new ProcessInstanceCreationCreatedApplier(processState, elementInstanceState));
}

private void registerProcessInstanceModificationAppliers(final MutableZeebeState state) {
register(
ProcessInstanceModificationIntent.MODIFIED,
new ProcessInstanceModifiedEventApplier(state.getElementInstanceState()));
}

private void registerJobIntentEventAppliers(final MutableZeebeState state) {
register(JobIntent.CANCELED, new JobCanceledApplier(state));
register(JobIntent.COMPLETED, new JobCompletedApplier(state));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.appliers;

import io.camunda.zeebe.engine.state.TypedEventApplier;
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.engine.state.mutable.MutableElementInstanceState;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;

final class ProcessInstanceModifiedEventApplier
implements TypedEventApplier<
ProcessInstanceModificationIntent, ProcessInstanceModificationRecord> {

private final MutableElementInstanceState elementInstanceState;

public ProcessInstanceModifiedEventApplier(
final MutableElementInstanceState elementInstanceState) {
this.elementInstanceState = elementInstanceState;
}

@Override
public void applyState(final long key, final ProcessInstanceModificationRecord value) {
value.getActivatedElementInstanceKeys().stream()
.map(elementInstanceState::getInstance)
.filter(ElementInstance::isInterrupted)
.forEach(
instance ->
elementInstanceState.updateInstance(
instance.getKey(), ElementInstance::clearInterruptedState));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public boolean isInterrupted() {
return getInterruptingElementId().capacity() > 0;
}

public void clearInterruptedState() {
interruptingEventKeyProp.setValue("");
}

public long getParentKey() {
return parentKeyProp.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.builder.EventSubProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.SubProcessBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -26,6 +27,7 @@
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Duration;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -734,6 +736,79 @@ public void shouldTerminateAndActivateElementInTheSameScope() {
verifyThatProcessInstanceIsCompleted(processInstanceKey);
}

@Test
public void shouldActivateElementInInterruptedFlowScope() {
// given
final Consumer<EventSubProcessBuilder> eventSubProcess =
eventSubprocess ->
eventSubprocess
.startEvent()
.interrupting(true)
.message(message -> message.name("interrupt").zeebeCorrelationKeyExpression("key"))
.userTask("A")
.endEvent();

final Consumer<SubProcessBuilder> subProcess =
subprocess -> subprocess.embeddedSubProcess().startEvent().userTask("C").endEvent();

ENGINE
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess(PROCESS_ID)
.eventSubProcess("event-subprocess", eventSubProcess)
.startEvent()
.userTask("B")
.subProcess("subprocess", subProcess)
.endEvent()
.done())
.deploy();

final var processInstanceKey =
ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "key-1").create();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("B")
.await();

ENGINE
.message()
.withName("interrupt")
.withCorrelationKey("key-1")
.withTimeToLive(Duration.ofMinutes(1))
.publish();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("A")
.await();

// when
ENGINE
.processInstance()
.withInstanceKey(processInstanceKey)
.modification()
.activateElement("C")
.activateElement("subprocess")
.modify();

// then
Assertions.assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("C")
.exists())
.isTrue();

Assertions.assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("subprocess")
.limit(2)
.count())
.isEqualTo(2);
}

private static void verifyThatRootElementIsActivated(
final long processInstanceKey, final String elementId, final BpmnElementType elementType) {
verifyThatElementIsActivated(processInstanceKey, elementId, elementType, processInstanceKey);
Expand Down

0 comments on commit 61d8cea

Please sign in to comment.