Skip to content

Commit

Permalink
merge: #10609 #10649
Browse files Browse the repository at this point in the history
10609: Remove interrupted state on event subprocess activation r=remcowesterhoud a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
When an interrupting event sub process gets triggered it will terminate all active element in its flow scope and mark the flow scope as interrupted. With process instance modification it should be possible to re-activate element within the interrupted scope. Because of the interrupted state any activate commands get rejected, making this currently impossible.

With this change we will check if any of the activated elements is currently in an interrupted state. If this is the case we will remove this state, allowing elements within to be activated through a modification.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10477 



10649: deps(maven): bump aws-java-sdk-core from 1.12.318 to 1.12.319 r=oleschoenburg a=dependabot[bot]

Bumps [aws-java-sdk-core](https://github.com/aws/aws-sdk-java) from 1.12.318 to 1.12.319.
<details>
<summary>Changelog</summary>
<p><em>Sourced from <a href="https://github.com/aws/aws-sdk-java/blob/master/CHANGELOG.md">aws-java-sdk-core's changelog</a>.</em></p>
<blockquote>
<h1><strong>1.12.319</strong> <strong>2022-10-07</strong></h1>
<h2><strong>AWS IoT Greengrass V2</strong></h2>
<ul>
<li>
<h3>Features</h3>
<ul>
<li>This release adds error status details for deployments and components that failed on a device and adds features to improve visibility into component installation.</li>
</ul>
</li>
</ul>
<h2><strong>Amazon CodeGuru Reviewer</strong></h2>
<ul>
<li>
<h3>Features</h3>
<ul>
<li>Documentation update to replace broken link.</li>
</ul>
</li>
</ul>
<h2><strong>Amazon QuickSight</strong></h2>
<ul>
<li>
<h3>Features</h3>
<ul>
<li>Amazon QuickSight now supports SecretsManager Secret ARN in place of CredentialPair for DataSource creation and update. This release also has some minor documentation updates and removes CountryCode as a required parameter in GeoSpatialColumnGroup</li>
</ul>
</li>
</ul>
<h2><strong>Elastic Load Balancing</strong></h2>
<ul>
<li>
<h3>Features</h3>
<ul>
<li>Gateway Load Balancer adds a new feature (target_failover) for customers to rebalance existing flows to a healthy target after marked unhealthy or deregistered. This allows graceful patching/upgrades of target appliances during maintenance windows, and helps reduce unhealthy target failover time.</li>
</ul>
</li>
</ul>
</blockquote>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a href="https://github.com/aws/aws-sdk-java/commit/e752d66f73b94f4c54d995b6bde56301ade507af"><code>e752d66</code></a> AWS SDK for Java 1.12.319</li>
<li><a href="https://github.com/aws/aws-sdk-java/commit/e176eb25613173ac59301716281018a837654589"><code>e176eb2</code></a> Update GitHub version number to 1.12.319-SNAPSHOT</li>
<li>See full diff in <a href="https://github.com/aws/aws-sdk-java/compare/1.12.318...1.12.319">compare view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=com.amazonaws:aws-java-sdk-core&package-manager=maven&previous-version=1.12.318&new-version=1.12.319)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting ``@dependabot` rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- ``@dependabot` rebase` will rebase this PR
- ``@dependabot` recreate` will recreate this PR, overwriting any edits that have been made to it
- ``@dependabot` merge` will merge this PR after your CI passes on it
- ``@dependabot` squash and merge` will squash and merge this PR after your CI passes on it
- ``@dependabot` cancel merge` will cancel a previously requested merge and block automerging
- ``@dependabot` reopen` will reopen this PR if it is closed
- ``@dependabot` close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- ``@dependabot` ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- ``@dependabot` ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)


</details>

Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Oct 10, 2022
3 parents 56dd345 + f7cc7b2 + 7f9e9d5 commit 99bf19c
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,11 @@ public void processRecord(
processInstance,
process,
instruction));

activatedElementKeys
.getFlowScopeKeys()
.forEach(value::addActivatedElementInstanceKey);

return activatedElementKeys.getFlowScopeKeys().stream();
})
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessMessageSubscriptionIntent;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
Expand All @@ -50,6 +51,7 @@ public final class EventAppliers implements EventApplier {
public EventAppliers(final MutableZeebeState state) {
registerProcessInstanceEventAppliers(state);
registerProcessInstanceCreationAppliers(state);
registerProcessInstanceModificationAppliers(state);

register(ProcessIntent.CREATED, new ProcessCreatedApplier(state));
register(ErrorIntent.CREATED, new ErrorCreatedApplier(state.getBlackListState()));
Expand Down Expand Up @@ -147,6 +149,12 @@ private void registerProcessInstanceCreationAppliers(final MutableZeebeState sta
new ProcessInstanceCreationCreatedApplier(processState, elementInstanceState));
}

private void registerProcessInstanceModificationAppliers(final MutableZeebeState state) {
register(
ProcessInstanceModificationIntent.MODIFIED,
new ProcessInstanceModifiedEventApplier(state.getElementInstanceState()));
}

private void registerJobIntentEventAppliers(final MutableZeebeState state) {
register(JobIntent.CANCELED, new JobCanceledApplier(state));
register(JobIntent.COMPLETED, new JobCompletedApplier(state));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.state.appliers;

import io.camunda.zeebe.engine.state.TypedEventApplier;
import io.camunda.zeebe.engine.state.instance.ElementInstance;
import io.camunda.zeebe.engine.state.mutable.MutableElementInstanceState;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceModificationRecord;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceModificationIntent;

final class ProcessInstanceModifiedEventApplier
implements TypedEventApplier<
ProcessInstanceModificationIntent, ProcessInstanceModificationRecord> {

private final MutableElementInstanceState elementInstanceState;

public ProcessInstanceModifiedEventApplier(
final MutableElementInstanceState elementInstanceState) {
this.elementInstanceState = elementInstanceState;
}

@Override
public void applyState(final long key, final ProcessInstanceModificationRecord value) {
value.getActivatedElementInstanceKeys().stream()
.map(elementInstanceState::getInstance)
.filter(ElementInstance::isInterrupted)
.forEach(
instance ->
elementInstanceState.updateInstance(
instance.getKey(), ElementInstance::clearInterruptedState));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ public boolean isInterrupted() {
return getInterruptingElementId().capacity() > 0;
}

public void clearInterruptedState() {
interruptingEventKeyProp.setValue("");
}

public long getParentKey() {
return parentKeyProp.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.model.bpmn.builder.EventSubProcessBuilder;
import io.camunda.zeebe.model.bpmn.builder.SubProcessBuilder;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -26,6 +27,7 @@
import io.camunda.zeebe.protocol.record.value.VariableRecordValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Duration;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -734,6 +736,79 @@ public void shouldTerminateAndActivateElementInTheSameScope() {
verifyThatProcessInstanceIsCompleted(processInstanceKey);
}

@Test
public void shouldActivateElementInInterruptedFlowScope() {
// given
final Consumer<EventSubProcessBuilder> eventSubProcess =
eventSubprocess ->
eventSubprocess
.startEvent()
.interrupting(true)
.message(message -> message.name("interrupt").zeebeCorrelationKeyExpression("key"))
.userTask("A")
.endEvent();

final Consumer<SubProcessBuilder> subProcess =
subprocess -> subprocess.embeddedSubProcess().startEvent().userTask("C").endEvent();

ENGINE
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess(PROCESS_ID)
.eventSubProcess("event-subprocess", eventSubProcess)
.startEvent()
.userTask("B")
.subProcess("subprocess", subProcess)
.endEvent()
.done())
.deploy();

final var processInstanceKey =
ENGINE.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "key-1").create();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("B")
.await();

ENGINE
.message()
.withName("interrupt")
.withCorrelationKey("key-1")
.withTimeToLive(Duration.ofMinutes(1))
.publish();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("A")
.await();

// when
ENGINE
.processInstance()
.withInstanceKey(processInstanceKey)
.modification()
.activateElement("C")
.activateElement("subprocess")
.modify();

// then
Assertions.assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("C")
.exists())
.isTrue();

Assertions.assertThat(
RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withProcessInstanceKey(processInstanceKey)
.withElementId("subprocess")
.limit(2)
.count())
.isEqualTo(2);
}

private static void verifyThatRootElementIsActivated(
final long processInstanceKey, final String elementId, final BpmnElementType elementType) {
verifyThatElementIsActivated(processInstanceKey, elementId, elementType, processInstanceKey);
Expand Down
2 changes: 1 addition & 1 deletion parent/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
<version>1.12.318</version>
<version>1.12.319</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.camunda.zeebe.msgpack.property.ArrayProperty;
import io.camunda.zeebe.msgpack.property.LongProperty;
import io.camunda.zeebe.msgpack.value.LongValue;
import io.camunda.zeebe.msgpack.value.ObjectValue;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceModificationRecordValue;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public final class ProcessInstanceModificationRecord extends UnifiedRecordValue
implements ProcessInstanceModificationRecordValue {
Expand All @@ -27,11 +30,14 @@ public final class ProcessInstanceModificationRecord extends UnifiedRecordValue
activateInstructionsProperty =
new ArrayProperty<>(
"activateInstructions", new ProcessInstanceModificationActivateInstruction());
private final ArrayProperty<LongValue> activatedElementInstanceKeys =
new ArrayProperty<>("activatedElementInstanceKeys", new LongValue());

public ProcessInstanceModificationRecord() {
declareProperty(processInstanceKeyProperty)
.declareProperty(terminateInstructionsProperty)
.declareProperty(activateInstructionsProperty);
.declareProperty(activateInstructionsProperty)
.declareProperty(activatedElementInstanceKeys);
}

/**
Expand Down Expand Up @@ -98,6 +104,17 @@ public ProcessInstanceModificationRecord addActivateInstruction(
return this;
}

public Set<Long> getActivatedElementInstanceKeys() {
return activatedElementInstanceKeys.stream()
.map(LongValue::getValue)
.collect(Collectors.toSet());
}

public ProcessInstanceModificationRecord addActivatedElementInstanceKey(final long key) {
activatedElementInstanceKeys.add().setValue(key);
return this;
}

@Override
public long getProcessInstanceKey() {
return processInstanceKeyProperty.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,8 @@ final var record = new DeploymentDistributionRecord();
}
}],
"elementId": "activity"
}]
}],
"activatedElementInstanceKeys": []
}
"""
},
Expand All @@ -803,7 +804,8 @@ final var record = new DeploymentDistributionRecord();
{
"processInstanceKey": 1,
"terminateInstructions": [],
"activateInstructions": []
"activateInstructions": [],
"activatedElementInstanceKeys": []
}
"""
},
Expand Down

0 comments on commit 99bf19c

Please sign in to comment.