Skip to content

Commit

Permalink
merge: #9241
Browse files Browse the repository at this point in the history
9241: [Backport 8.0] Trigger boundary events after an interrupting event subprocess is triggered r=saig0 a=saig0

## Description

Backport of #9175

## Related issues

relates to #6874


Co-authored-by: Philipp Ossler <philipp.ossler@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and saig0 committed Apr 28, 2022
2 parents 6ff034c + 4b506ad commit 6e559d1
Show file tree
Hide file tree
Showing 21 changed files with 932 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static TypedRecordProcessors createEngineProcessors(
final CatchEventBehavior catchEventBehavior =
new CatchEventBehavior(
zeebeState,
zeebeState.getKeyGenerator(),
expressionProcessor,
subscriptionCommandSender,
writers.state(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void throwErrorEvent(final CatchEventAnalyzer.CatchEventTuple catchEventT
final ElementInstance eventScopeInstance = catchEventTuple.getElementInstance();
final ExecutableCatchEvent catchEvent = catchEventTuple.getCatchEvent();

if (eventHandle.canTriggerElement(eventScopeInstance)) {
if (eventHandle.canTriggerElement(eventScopeInstance, catchEvent.getId())) {
eventHandle.activateElement(
catchEvent, eventScopeInstance.getKey(), eventScopeInstance.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.camunda.zeebe.engine.processing.bpmn.BpmnElementContext;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableCatchEvent;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableCatchEventSupplier;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMessage;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects;
Expand All @@ -21,10 +22,11 @@
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.ProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.engine.state.immutable.TimerInstanceState;
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
import io.camunda.zeebe.engine.state.instance.TimerInstance;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.model.bpmn.util.time.Timer;
import io.camunda.zeebe.protocol.impl.SubscriptionUtil;
import io.camunda.zeebe.protocol.impl.record.value.message.ProcessMessageSubscriptionRecord;
Expand All @@ -36,6 +38,7 @@
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import java.util.List;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;

public final class CatchEventBehavior {
Expand All @@ -47,6 +50,7 @@ public final class CatchEventBehavior {

private final ProcessMessageSubscriptionState processMessageSubscriptionState;
private final TimerInstanceState timerInstanceState;
private final ProcessState processState;

private final ProcessMessageSubscriptionRecord subscription =
new ProcessMessageSubscriptionRecord();
Expand All @@ -55,7 +59,8 @@ public final class CatchEventBehavior {
private final KeyGenerator keyGenerator;

public CatchEventBehavior(
final MutableZeebeState zeebeState,
final ZeebeState zeebeState,
final KeyGenerator keyGenerator,
final ExpressionProcessor expressionProcessor,
final SubscriptionCommandSender subscriptionCommandSender,
final StateWriter stateWriter,
Expand All @@ -68,19 +73,70 @@ public CatchEventBehavior(

timerInstanceState = zeebeState.getTimerState();
processMessageSubscriptionState = zeebeState.getProcessMessageSubscriptionState();
processState = zeebeState.getProcessState();

keyGenerator = zeebeState.getKeyGenerator();

this.keyGenerator = keyGenerator;
this.timerChecker = timerChecker;
}

/**
* Unsubscribe from all events in the scope of the context.
*
* @param context the context to subscript from
* @param commandWriter the writer for unsubscribe commands
* @param sideEffects the side effects for unsubscribe actions
*/
public void unsubscribeFromEvents(
final BpmnElementContext context,
final TypedCommandWriter commandWriter,
final SideEffects sideEffects) {
unsubscribeFromEvents(context, commandWriter, sideEffects, elementId -> true);
}

/**
* Unsubscribe from all event subprocesses in the scope of the context. Ignores other event
* subscriptions in the scope.
*
* @param context the context to subscript from
* @param commandWriter the writer for unsubscribe commands
* @param sideEffects the side effects for unsubscribe actions
*/
public void unsubscribeEventSubprocesses(
final BpmnElementContext context,
final TypedCommandWriter commandWriter,
final SideEffects sideEffects) {
unsubscribeFromEvents(
context, commandWriter, sideEffects, elementId -> isEventSubprocess(context, elementId));
}

private boolean isEventSubprocess(
final BpmnElementContext context, final DirectBuffer elementId) {

final var element =
processState.getFlowElement(
context.getProcessDefinitionKey(), elementId, ExecutableFlowElement.class);

return element.getElementType() == BpmnElementType.START_EVENT
&& element.getFlowScope().getElementType() == BpmnElementType.EVENT_SUB_PROCESS;
}

/**
* Unsubscribe from all events in the scope of the context that matches the given filter. Ignore
* other event subscriptions that don't match the filter.
*
* @param context the context to subscript from
* @param commandWriter the writer for unsubscribe commands
* @param sideEffects the side effects for unsubscribe actions
* @param elementIdFilter the filter for events to unsubscribe
*/
private void unsubscribeFromEvents(
final BpmnElementContext context,
final TypedCommandWriter commandWriter,
final SideEffects sideEffects,
final Predicate<DirectBuffer> elementIdFilter) {

unsubscribeFromTimerEvents(context, commandWriter);
unsubscribeFromMessageEvents(context, sideEffects);
unsubscribeFromTimerEvents(context, commandWriter, elementIdFilter);
unsubscribeFromMessageEvents(context, sideEffects, elementIdFilter);
}

/** @return either a failure or nothing */
Expand Down Expand Up @@ -249,9 +305,16 @@ public void subscribeToTimerEvent(
}

private void unsubscribeFromTimerEvents(
final BpmnElementContext context, final TypedCommandWriter commandWriter) {
final BpmnElementContext context,
final TypedCommandWriter commandWriter,
final Predicate<DirectBuffer> elementIdFilter) {
timerInstanceState.forEachTimerForElementInstance(
context.getElementInstanceKey(), t -> unsubscribeFromTimerEvent(t, commandWriter));
context.getElementInstanceKey(),
timer -> {
if (elementIdFilter.test(timer.getHandlerNodeId())) {
unsubscribeFromTimerEvent(timer, commandWriter);
}
});
}

public void unsubscribeFromTimerEvent(
Expand All @@ -269,13 +332,21 @@ public void unsubscribeFromTimerEvent(
}

private void unsubscribeFromMessageEvents(
final BpmnElementContext context, final SideEffects sideEffects) {
final BpmnElementContext context,
final SideEffects sideEffects,
final Predicate<DirectBuffer> elementIdFilter) {
processMessageSubscriptionState.visitElementSubscriptions(
context.getElementInstanceKey(),
subscription -> unsubscribeFromMessageEvent(subscription, sideEffects));
subscription -> {
final var elementId = subscription.getRecord().getElementIdBuffer();
if (elementIdFilter.test(elementId)) {
unsubscribeFromMessageEvent(subscription, sideEffects);
}
return true;
});
}

private boolean unsubscribeFromMessageEvent(
private void unsubscribeFromMessageEvent(
final ProcessMessageSubscription subscription, final SideEffects sideEffects) {

final DirectBuffer messageName = cloneBuffer(subscription.getRecord().getMessageNameBuffer());
Expand All @@ -289,8 +360,6 @@ private boolean unsubscribeFromMessageEvent(
() ->
sendCloseMessageSubscriptionCommand(
subscriptionPartitionId, processInstanceKey, elementInstanceKey, messageName));

return true;
}

private boolean sendCloseMessageSubscriptionCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ public EventHandle(
this.eventTriggerBehavior = eventTriggerBehavior;
}

public boolean canTriggerElement(final ElementInstance eventScopeInstance) {
public boolean canTriggerElement(
final ElementInstance eventScopeInstance, final DirectBuffer elementId) {
return eventScopeInstance != null
&& eventScopeInstance.isActive()
&& eventScopeInstanceState.isAcceptingEvent(eventScopeInstance.getKey());
&& eventScopeInstanceState.canTriggerEvent(eventScopeInstance.getKey(), elementId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public EventTriggerBehavior(
new VariableBehavior(zeebeState.getVariableState(), writers.state(), keyGenerator);
}

public void unsubscribeFromEvents(final BpmnElementContext context) {
private void unsubscribeEventSubprocesses(final BpmnElementContext context) {
final var sideEffectQueue = new SideEffectQueue();
catchEventBehavior.unsubscribeFromEvents(context, commandWriter, sideEffectQueue);
catchEventBehavior.unsubscribeEventSubprocesses(context, commandWriter, sideEffectQueue);

// side effect can immediately executed, since on restart we not reprocess anymore the commands
sideEffectQueue.flush();
Expand Down Expand Up @@ -98,7 +98,7 @@ public void triggerEventSubProcess(
}

if (startEvent.interrupting()) {
unsubscribeFromEvents(flowScopeContext);
unsubscribeEventSubprocesses(flowScopeContext);

final var noActiveChildInstances = terminateChildInstances(flowScopeContext);
if (!noActiveChildInstances) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class ExecutableActivity extends ExecutableFlowNode implements Executable

private final List<ExecutableCatchEvent> catchEvents = new ArrayList<>();
private final List<DirectBuffer> interruptingIds = new ArrayList<>();
private final List<DirectBuffer> boundaryElementIds = new ArrayList<>();

public ExecutableActivity(final String id) {
super(id);
Expand All @@ -28,8 +29,11 @@ public void attach(final ExecutableBoundaryEvent boundaryEvent) {
boundaryEvents.add(boundaryEvent);
catchEvents.add(boundaryEvent);

final var boundaryEventElementId = boundaryEvent.getId();
boundaryElementIds.add(boundaryEventElementId);

if (boundaryEvent.interrupting()) {
interruptingIds.add(boundaryEvent.getId());
interruptingIds.add(boundaryEventElementId);
}
}

Expand Down Expand Up @@ -57,6 +61,11 @@ public Collection<DirectBuffer> getInterruptingElementIds() {
return interruptingIds;
}

@Override
public Collection<DirectBuffer> getBoundaryElementIds() {
return boundaryElementIds;
}

public List<ExecutableBoundaryEvent> getBoundaryEvents() {
return boundaryEvents;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public Collection<DirectBuffer> getInterruptingElementIds() {
return Collections.singleton(getId());
}

@Override
public Collection<DirectBuffer> getBoundaryElementIds() {
return Collections.emptySet();
}

public boolean interrupting() {
return interrupting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,17 @@
public interface ExecutableCatchEventSupplier extends ExecutableFlowElement {
List<ExecutableCatchEvent> getEvents();

/**
* Returns the ids of the containing elements that interrupt the event scope (e.g. interrupting
* event subprocesses). An interrupted event scope can not be triggered by other interrupting or
* non-interrupting events. But the event scope can still be triggered by boundary events.
*/
Collection<DirectBuffer> getInterruptingElementIds();

/**
* Returns the ids of the boundary events. An interrupting boundary event must return its id also
* with {@link #getInterruptingElementIds()}. Otherwise, it is handled as non-interrupting
* boundary event.
*/
Collection<DirectBuffer> getBoundaryElementIds();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.agrona.DirectBuffer;

Expand Down Expand Up @@ -40,4 +41,9 @@ public void setEvents(final List<ExecutableCatchEvent> events) {
public Collection<DirectBuffer> getInterruptingElementIds() {
return eventIds;
}

@Override
public Collection<DirectBuffer> getBoundaryElementIds() {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ private void acceptCommand(
commandControl.reject(
RejectionType.INVALID_STATE,
"Expected to find active service task, but was " + serviceTaskInstance);
} else if (!eventScopeInstanceState.isAcceptingEvent(
foundCatchEvent.get().getElementInstance().getKey())) {
} else if (!eventScopeInstanceState.canTriggerEvent(
foundCatchEvent.get().getElementInstance().getKey(),
foundCatchEvent.get().getCatchEvent().getId())) {
commandControl.reject(
RejectionType.INVALID_STATE,
"Expected to find event scope that is accepting events, but was "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ final var record = command.getValue();

} else {
final var elementInstance = elementInstanceState.getInstance(elementInstanceKey);
final var canTriggerElement = eventHandle.canTriggerElement(elementInstance);
final var canTriggerElement =
eventHandle.canTriggerElement(
elementInstance, subscription.getRecord().getElementIdBuffer());

if (!canTriggerElement) {
rejectCommand(command, RejectionType.INVALID_STATE, NO_EVENT_OCCURRED_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void processRecord(
processDefinitionKey, processInstanceKey, timer.getTargetElementIdBuffer(), NO_VARIABLES);
} else {
final var elementInstance = elementInstanceState.getInstance(elementInstanceKey);
if (!eventHandle.canTriggerElement(elementInstance)) {
if (!eventHandle.canTriggerElement(elementInstance, timer.getTargetElementIdBuffer())) {
rejectNoActiveTimer(record);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,21 @@ private void createEventScope(
elementRecord.getElementIdBuffer(),
flowElementClass);

if (flowElement instanceof ExecutableCatchEventSupplier) {
final var eventSupplier = (ExecutableCatchEventSupplier) flowElement;
if (flowElement instanceof final ExecutableCatchEventSupplier eventSupplier) {

final var hasEvents = !eventSupplier.getEvents().isEmpty();
if (hasEvents
|| flowElement instanceof ExecutableJobWorkerElement
|| flowElement instanceof ExecutableCallActivity) {
eventScopeInstanceState.createInstance(
elementInstanceKey, eventSupplier.getInterruptingElementIds());
elementInstanceKey,
eventSupplier.getInterruptingElementIds(),
eventSupplier.getBoundaryElementIds());
}
} else if (flowElement instanceof ExecutableJobWorkerElement) {
eventScopeInstanceState.createInstance(elementInstanceKey, Collections.emptyList());
// job worker elements without events (e.g. message throw events)
eventScopeInstanceState.createInstance(
elementInstanceKey, Collections.emptySet(), Collections.emptySet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.engine.state.instance.EventScopeInstance;
import io.camunda.zeebe.engine.state.instance.EventTrigger;
import org.agrona.DirectBuffer;

public interface EventScopeInstanceState {

Expand All @@ -30,8 +31,11 @@ public interface EventScopeInstanceState {
EventTrigger peekEventTrigger(long eventScopeKey);

/**
* Checks if the event scope can be triggered for the given event.
*
* @param eventScopeKey the key of the event scope the event is triggered in
* @return true if the event can be accepted
* @param elementId the element id of the event that is triggered
* @return {@code true} if the event can be triggered, otherwise {@code false}
*/
boolean isAcceptingEvent(long eventScopeKey);
boolean canTriggerEvent(long eventScopeKey, final DirectBuffer elementId);
}

0 comments on commit 6e559d1

Please sign in to comment.