Skip to content

Commit

Permalink
merge: #10609
Browse files Browse the repository at this point in the history
10609: Remove interrupted state on event subprocess activation r=remcowesterhoud a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
When an interrupting event sub process gets triggered it will terminate all active element in its flow scope and mark the flow scope as interrupted. With process instance modification it should be possible to re-activate element within the interrupted scope. Because of the interrupted state any activate commands get rejected, making this currently impossible.

With this change we will check if any of the activated elements is currently in an interrupted state. If this is the case we will remove this state, allowing elements within to be activated through a modification.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10477 



Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Oct 10, 2022
2 parents 56dd345 + f7cc7b2 commit 792919e
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 3 deletions.
Expand Up @@ -185,6 +185,11 @@ public void processRecord(
processInstance,
process,
instruction));

activatedElementKeys
.getFlowScopeKeys()
.forEach(value::addActivatedElementInstanceKey);

return activatedElementKeys.getFlowScopeKeys().stream();
})
.collect(Collectors.toSet());
Expand Down
Expand Up @@ -27,6 +27,7 @@
import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
Expand All @@ -50,6 +51,7 @@ public final class EventAppliers implements EventApplier {
public EventAppliers(final MutableZeebeState state) {
registerProcessInstanceEventAppliers(state);
registerProcessInstanceCreationAppliers(state);
registerProcessInstanceModificationAppliers(state);

register(ProcessIntent.CREATED, new ProcessCreatedApplier(state));
register(ErrorIntent.CREATED, new ErrorCreatedApplier(state.getBlackListState()));
Expand Down Expand Up @@ -147,6 +149,12 @@ private void registerProcessInstanceCreationAppliers(final MutableZeebeState sta
new ProcessInstanceCreationCreatedApplier(processState, elementInstanceState));
}

private void registerProcessInstanceModificationAppliers(final MutableZeebeState state) {
register(
ProcessInstanceModificationIntent.MODIFIED,
new ProcessInstanceModifiedEventApplier(state.getElementInstanceState()));
}

private void registerJobIntentEventAppliers(final MutableZeebeState state) {
register(JobIntent.CANCELED, new JobCanceledApplier(state));
register(JobIntent.COMPLETED, new JobCompletedApplier(state));
Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.appliers;

import io.camunda.zeebe.engine.state.TypedEventApplier;
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.engine.state.mutable.MutableElementInstanceState;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;

final class ProcessInstanceModifiedEventApplier
implements TypedEventApplier<
ProcessInstanceModificationIntent, ProcessInstanceModificationRecord> {

private final MutableElementInstanceState elementInstanceState;

public ProcessInstanceModifiedEventApplier(
final MutableElementInstanceState elementInstanceState) {
this.elementInstanceState = elementInstanceState;
}

@Override
public void applyState(final long key, final ProcessInstanceModificationRecord value) {
value.getActivatedElementInstanceKeys().stream()
.map(elementInstanceState::getInstance)
.filter(ElementInstance::isInterrupted)
.forEach(
instance ->
elementInstanceState.updateInstance(
instance.getKey(), ElementInstance::clearInterruptedState));
}
}
Expand Up @@ -188,6 +188,10 @@ public boolean isInterrupted() {
return getInterruptingElementId().capacity() > 0;
}

public void clearInterruptedState() {
interruptingEventKeyProp.setValue("");
}

public long getParentKey() {
return parentKeyProp.getValue();
}
Expand Down
Expand Up @@ -13,6 +13,7 @@

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.builder.EventSubProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.SubProcessBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -26,6 +27,7 @@
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Duration;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -734,6 +736,79 @@ public void shouldTerminateAndActivateElementInTheSameScope() {
verifyThatProcessInstanceIsCompleted(processInstanceKey);
}

@Test
public void shouldActivateElementInInterruptedFlowScope() {
// given
final Consumer<EventSubProcessBuilder> eventSubProcess =
eventSubprocess ->
eventSubprocess
.startEvent()
.interrupting(true)
.message(message -> message.name("interrupt").zeebeCorrelationKeyExpression("key"))
.userTask("A")
.endEvent();

final Consumer<SubProcessBuilder> subProcess =
subprocess -> subprocess.embeddedSubProcess().startEvent().userTask("C").endEvent();

ENGINE
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess(PROCESS_ID)
.eventSubProcess("event-subprocess", eventSubProcess)
.startEvent()
.userTask("B")
.subProcess("subprocess", subProcess)
.endEvent()
.done())
.deploy();

final var processInstanceKey =
ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "key-1").create();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("B")
.await();

ENGINE
.message()
.withName("interrupt")
.withCorrelationKey("key-1")
.withTimeToLive(Duration.ofMinutes(1))
.publish();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("A")
.await();

// when
ENGINE
.processInstance()
.withInstanceKey(processInstanceKey)
.modification()
.activateElement("C")
.activateElement("subprocess")
.modify();

// then
Assertions.assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("C")
.exists())
.isTrue();

Assertions.assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("subprocess")
.limit(2)
.count())
.isEqualTo(2);
}

private static void verifyThatRootElementIsActivated(
final long processInstanceKey, final String elementId, final BpmnElementType elementType) {
verifyThatElementIsActivated(processInstanceKey, elementId, elementType, processInstanceKey);
Expand Down
Expand Up @@ -10,10 +10,13 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.camunda.zeebe.msgpack.property.ArrayProperty;
import io.camunda.zeebe.msgpack.property.LongProperty;
import io.camunda.zeebe.msgpack.value.LongValue;
import io.camunda.zeebe.msgpack.value.ObjectValue;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceModificationRecordValue;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public final class ProcessInstanceModificationRecord extends UnifiedRecordValue
implements ProcessInstanceModificationRecordValue {
Expand All @@ -27,11 +30,14 @@ public final class ProcessInstanceModificationRecord extends UnifiedRecordValue
activateInstructionsProperty =
new ArrayProperty<>(
"activateInstructions", new ProcessInstanceModificationActivateInstruction());
private final ArrayProperty<LongValue> activatedElementInstanceKeys =
new ArrayProperty<>("activatedElementInstanceKeys", new LongValue());

public ProcessInstanceModificationRecord() {
declareProperty(processInstanceKeyProperty)
.declareProperty(terminateInstructionsProperty)
.declareProperty(activateInstructionsProperty);
.declareProperty(activateInstructionsProperty)
.declareProperty(activatedElementInstanceKeys);
}

/**
Expand Down Expand Up @@ -98,6 +104,17 @@ public ProcessInstanceModificationRecord addActivateInstruction(
return this;
}

public Set<Long> getActivatedElementInstanceKeys() {
return activatedElementInstanceKeys.stream()
.map(LongValue::getValue)
.collect(Collectors.toSet());
}

public ProcessInstanceModificationRecord addActivatedElementInstanceKey(final long key) {
activatedElementInstanceKeys.add().setValue(key);
return this;
}

@Override
public long getProcessInstanceKey() {
return processInstanceKeyProperty.getValue();
Expand Down
Expand Up @@ -787,7 +787,8 @@ final var record = new DeploymentDistributionRecord();
}
}],
"elementId": "activity"
}]
}],
"activatedElementInstanceKeys": []
}
"""
},
Expand All @@ -803,7 +804,8 @@ final var record = new DeploymentDistributionRecord();
{
"processInstanceKey": 1,
"terminateInstructions": [],
"activateInstructions": []
"activateInstructions": [],
"activatedElementInstanceKeys": []
}
"""
},
Expand Down

0 comments on commit 792919e

Please sign in to comment.