Skip to content

Commit

Permalink
merge: #9175
Browse files Browse the repository at this point in the history
9175: Trigger boundary events after an interrupting event subprocess is triggered r=saig0 a=saig0

## Description

Fixing the issue that boundary events can't be triggered after an interrupting event subprocess is triggered on the scope. 

Solve the issue by:
* avoid that event subscriptions of boundary event are removed when an interrupting event scope is triggered
  * introduce a filter when removing the event subscriptions
  * filter by event subprocesses and ignore other event subscriptions
* extend the event scope instance state to distinguish interrupting event subscriptions from boundary events
  * add the property `boundaryElementIds` for event scopes to distinguish boundary events from other events in the scope 
  * add the property `interrupted` for event scopes to mark if an interrupting event (subprocess) is triggered
  * if the event scope is interrupted then no other interrupting or non-interrupting events can be triggered
  * only boundary events can be triggered for an interrupted event scope
  * if an interrupting boundary event is triggered then the event scope doesn't accept any other event 

Maintenance:
* remove methods from the event scope state that are not used in production code
* migrate the event scope state test to JUnit 5       

## Related issues

closes #6874 



Co-authored-by: Philipp Ossler <philipp.ossler@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and saig0 committed Apr 27, 2022
2 parents dd3e9fb + 7791618 commit 2f4d8c8
Show file tree
Hide file tree
Showing 21 changed files with 932 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public static TypedRecordProcessors createEngineProcessors(
final CatchEventBehavior catchEventBehavior =
new CatchEventBehavior(
zeebeState,
zeebeState.getKeyGenerator(),
expressionProcessor,
subscriptionCommandSender,
writers.state(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void throwErrorEvent(final CatchEventAnalyzer.CatchEventTuple catchEventT
final ElementInstance eventScopeInstance = catchEventTuple.getElementInstance();
final ExecutableCatchEvent catchEvent = catchEventTuple.getCatchEvent();

if (eventHandle.canTriggerElement(eventScopeInstance)) {
if (eventHandle.canTriggerElement(eventScopeInstance, catchEvent.getId())) {
eventHandle.activateElement(
catchEvent, eventScopeInstance.getKey(), eventScopeInstance.getValue());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.camunda.zeebe.engine.processing.bpmn.BpmnElementContext;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableCatchEvent;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableCatchEventSupplier;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableFlowElement;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableMessage;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects;
Expand All @@ -21,10 +22,11 @@
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.ProcessMessageSubscriptionState;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
import io.camunda.zeebe.engine.state.immutable.TimerInstanceState;
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
import io.camunda.zeebe.engine.state.instance.TimerInstance;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.model.bpmn.util.time.Timer;
import io.camunda.zeebe.protocol.impl.SubscriptionUtil;
import io.camunda.zeebe.protocol.impl.record.value.message.ProcessMessageSubscriptionRecord;
Expand All @@ -36,6 +38,7 @@
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import java.util.List;
import java.util.function.Predicate;
import org.agrona.DirectBuffer;

public final class CatchEventBehavior {
Expand All @@ -47,6 +50,7 @@ public final class CatchEventBehavior {

private final ProcessMessageSubscriptionState processMessageSubscriptionState;
private final TimerInstanceState timerInstanceState;
private final ProcessState processState;

private final ProcessMessageSubscriptionRecord subscription =
new ProcessMessageSubscriptionRecord();
Expand All @@ -55,7 +59,8 @@ public final class CatchEventBehavior {
private final KeyGenerator keyGenerator;

public CatchEventBehavior(
final MutableZeebeState zeebeState,
final ZeebeState zeebeState,
final KeyGenerator keyGenerator,
final ExpressionProcessor expressionProcessor,
final SubscriptionCommandSender subscriptionCommandSender,
final StateWriter stateWriter,
Expand All @@ -68,19 +73,70 @@ public CatchEventBehavior(

timerInstanceState = zeebeState.getTimerState();
processMessageSubscriptionState = zeebeState.getProcessMessageSubscriptionState();
processState = zeebeState.getProcessState();

keyGenerator = zeebeState.getKeyGenerator();

this.keyGenerator = keyGenerator;
this.timerChecker = timerChecker;
}

/**
* Unsubscribe from all events in the scope of the context.
*
* @param context the context to subscript from
* @param commandWriter the writer for unsubscribe commands
* @param sideEffects the side effects for unsubscribe actions
*/
public void unsubscribeFromEvents(
final BpmnElementContext context,
final TypedCommandWriter commandWriter,
final SideEffects sideEffects) {
unsubscribeFromEvents(context, commandWriter, sideEffects, elementId -> true);
}

/**
* Unsubscribe from all event subprocesses in the scope of the context. Ignores other event
* subscriptions in the scope.
*
* @param context the context to subscript from
* @param commandWriter the writer for unsubscribe commands
* @param sideEffects the side effects for unsubscribe actions
*/
public void unsubscribeEventSubprocesses(
final BpmnElementContext context,
final TypedCommandWriter commandWriter,
final SideEffects sideEffects) {
unsubscribeFromEvents(
context, commandWriter, sideEffects, elementId -> isEventSubprocess(context, elementId));
}

private boolean isEventSubprocess(
final BpmnElementContext context, final DirectBuffer elementId) {

final var element =
processState.getFlowElement(
context.getProcessDefinitionKey(), elementId, ExecutableFlowElement.class);

return element.getElementType() == BpmnElementType.START_EVENT
&& element.getFlowScope().getElementType() == BpmnElementType.EVENT_SUB_PROCESS;
}

unsubscribeFromTimerEvents(context, commandWriter);
unsubscribeFromMessageEvents(context, sideEffects);
/**
* Unsubscribe from all events in the scope of the context that matches the given filter. Ignore
* other event subscriptions that don't match the filter.
*
* @param context the context to subscript from
* @param commandWriter the writer for unsubscribe commands
* @param sideEffects the side effects for unsubscribe actions
* @param elementIdFilter the filter for events to unsubscribe
*/
private void unsubscribeFromEvents(
final BpmnElementContext context,
final TypedCommandWriter commandWriter,
final SideEffects sideEffects,
final Predicate<DirectBuffer> elementIdFilter) {

unsubscribeFromTimerEvents(context, commandWriter, elementIdFilter);
unsubscribeFromMessageEvents(context, sideEffects, elementIdFilter);
}

/**
Expand Down Expand Up @@ -267,9 +323,16 @@ public void subscribeToTimerEvent(
}

private void unsubscribeFromTimerEvents(
final BpmnElementContext context, final TypedCommandWriter commandWriter) {
final BpmnElementContext context,
final TypedCommandWriter commandWriter,
final Predicate<DirectBuffer> elementIdFilter) {
timerInstanceState.forEachTimerForElementInstance(
context.getElementInstanceKey(), t -> unsubscribeFromTimerEvent(t, commandWriter));
context.getElementInstanceKey(),
timer -> {
if (elementIdFilter.test(timer.getHandlerNodeId())) {
unsubscribeFromTimerEvent(timer, commandWriter);
}
});
}

public void unsubscribeFromTimerEvent(
Expand All @@ -287,13 +350,21 @@ public void unsubscribeFromTimerEvent(
}

private void unsubscribeFromMessageEvents(
final BpmnElementContext context, final SideEffects sideEffects) {
final BpmnElementContext context,
final SideEffects sideEffects,
final Predicate<DirectBuffer> elementIdFilter) {
processMessageSubscriptionState.visitElementSubscriptions(
context.getElementInstanceKey(),
subscription -> unsubscribeFromMessageEvent(subscription, sideEffects));
subscription -> {
final var elementId = subscription.getRecord().getElementIdBuffer();
if (elementIdFilter.test(elementId)) {
unsubscribeFromMessageEvent(subscription, sideEffects);
}
return true;
});
}

private boolean unsubscribeFromMessageEvent(
private void unsubscribeFromMessageEvent(
final ProcessMessageSubscription subscription, final SideEffects sideEffects) {

final DirectBuffer messageName = cloneBuffer(subscription.getRecord().getMessageNameBuffer());
Expand All @@ -307,8 +378,6 @@ private boolean unsubscribeFromMessageEvent(
() ->
sendCloseMessageSubscriptionCommand(
subscriptionPartitionId, processInstanceKey, elementInstanceKey, messageName));

return true;
}

private boolean sendCloseMessageSubscriptionCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,11 @@ public EventHandle(
this.eventTriggerBehavior = eventTriggerBehavior;
}

public boolean canTriggerElement(final ElementInstance eventScopeInstance) {
public boolean canTriggerElement(
final ElementInstance eventScopeInstance, final DirectBuffer elementId) {
return eventScopeInstance != null
&& eventScopeInstance.isActive()
&& eventScopeInstanceState.isAcceptingEvent(eventScopeInstance.getKey());
&& eventScopeInstanceState.canTriggerEvent(eventScopeInstance.getKey(), elementId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ public EventTriggerBehavior(
new VariableBehavior(zeebeState.getVariableState(), writers.state(), keyGenerator);
}

public void unsubscribeFromEvents(final BpmnElementContext context) {
private void unsubscribeEventSubprocesses(final BpmnElementContext context) {
final var sideEffectQueue = new SideEffectQueue();
catchEventBehavior.unsubscribeFromEvents(context, commandWriter, sideEffectQueue);
catchEventBehavior.unsubscribeEventSubprocesses(context, commandWriter, sideEffectQueue);

// side effect can immediately executed, since on restart we not reprocess anymore the commands
sideEffectQueue.flush();
Expand Down Expand Up @@ -98,7 +98,7 @@ public void triggerEventSubProcess(
}

if (startEvent.interrupting()) {
unsubscribeFromEvents(flowScopeContext);
unsubscribeEventSubprocesses(flowScopeContext);

final var noActiveChildInstances = terminateChildInstances(flowScopeContext);
if (!noActiveChildInstances) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class ExecutableActivity extends ExecutableFlowNode implements Executable

private final List<ExecutableCatchEvent> catchEvents = new ArrayList<>();
private final List<DirectBuffer> interruptingIds = new ArrayList<>();
private final List<DirectBuffer> boundaryElementIds = new ArrayList<>();

public ExecutableActivity(final String id) {
super(id);
Expand All @@ -28,8 +29,11 @@ public void attach(final ExecutableBoundaryEvent boundaryEvent) {
boundaryEvents.add(boundaryEvent);
catchEvents.add(boundaryEvent);

final var boundaryEventElementId = boundaryEvent.getId();
boundaryElementIds.add(boundaryEventElementId);

if (boundaryEvent.interrupting()) {
interruptingIds.add(boundaryEvent.getId());
interruptingIds.add(boundaryEventElementId);
}
}

Expand Down Expand Up @@ -57,6 +61,11 @@ public Collection<DirectBuffer> getInterruptingElementIds() {
return interruptingIds;
}

@Override
public Collection<DirectBuffer> getBoundaryElementIds() {
return boundaryElementIds;
}

public List<ExecutableBoundaryEvent> getBoundaryEvents() {
return boundaryEvents;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ public Collection<DirectBuffer> getInterruptingElementIds() {
return Collections.singleton(getId());
}

@Override
public Collection<DirectBuffer> getBoundaryElementIds() {
return Collections.emptySet();
}

public boolean interrupting() {
return interrupting;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,17 @@
public interface ExecutableCatchEventSupplier extends ExecutableFlowElement {
List<ExecutableCatchEvent> getEvents();

/**
* Returns the ids of the containing elements that interrupt the event scope (e.g. interrupting
* event subprocesses). An interrupted event scope can not be triggered by other interrupting or
* non-interrupting events. But the event scope can still be triggered by boundary events.
*/
Collection<DirectBuffer> getInterruptingElementIds();

/**
* Returns the ids of the boundary events. An interrupting boundary event must return its id also
* with {@link #getInterruptingElementIds()}. Otherwise, it is handled as non-interrupting
* boundary event.
*/
Collection<DirectBuffer> getBoundaryElementIds();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.agrona.DirectBuffer;

Expand Down Expand Up @@ -40,4 +41,9 @@ public void setEvents(final List<ExecutableCatchEvent> events) {
public Collection<DirectBuffer> getInterruptingElementIds() {
return eventIds;
}

@Override
public Collection<DirectBuffer> getBoundaryElementIds() {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,9 @@ private void acceptCommand(
commandControl.reject(
RejectionType.INVALID_STATE,
"Expected to find active service task, but was " + serviceTaskInstance);
} else if (!eventScopeInstanceState.isAcceptingEvent(
foundCatchEvent.get().getElementInstance().getKey())) {
} else if (!eventScopeInstanceState.canTriggerEvent(
foundCatchEvent.get().getElementInstance().getKey(),
foundCatchEvent.get().getCatchEvent().getId())) {
commandControl.reject(
RejectionType.INVALID_STATE,
"Expected to find event scope that is accepting events, but was "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ final var record = command.getValue();

} else {
final var elementInstance = elementInstanceState.getInstance(elementInstanceKey);
final var canTriggerElement = eventHandle.canTriggerElement(elementInstance);
final var canTriggerElement =
eventHandle.canTriggerElement(
elementInstance, subscription.getRecord().getElementIdBuffer());

if (!canTriggerElement) {
rejectCommand(command, RejectionType.INVALID_STATE, NO_EVENT_OCCURRED_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void processRecord(
processDefinitionKey, processInstanceKey, timer.getTargetElementIdBuffer(), NO_VARIABLES);
} else {
final var elementInstance = elementInstanceState.getInstance(elementInstanceKey);
if (!eventHandle.canTriggerElement(elementInstance)) {
if (!eventHandle.canTriggerElement(elementInstance, timer.getTargetElementIdBuffer())) {
rejectNoActiveTimer(record);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,21 @@ private void createEventScope(
elementRecord.getElementIdBuffer(),
flowElementClass);

if (flowElement instanceof ExecutableCatchEventSupplier) {
final var eventSupplier = (ExecutableCatchEventSupplier) flowElement;
if (flowElement instanceof final ExecutableCatchEventSupplier eventSupplier) {

final var hasEvents = !eventSupplier.getEvents().isEmpty();
if (hasEvents
|| flowElement instanceof ExecutableJobWorkerElement
|| flowElement instanceof ExecutableCallActivity) {
eventScopeInstanceState.createInstance(
elementInstanceKey, eventSupplier.getInterruptingElementIds());
elementInstanceKey,
eventSupplier.getInterruptingElementIds(),
eventSupplier.getBoundaryElementIds());
}
} else if (flowElement instanceof ExecutableJobWorkerElement) {
eventScopeInstanceState.createInstance(elementInstanceKey, Collections.emptyList());
// job worker elements without events (e.g. message throw events)
eventScopeInstanceState.createInstance(
elementInstanceKey, Collections.emptySet(), Collections.emptySet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.engine.state.instance.EventScopeInstance;
import io.camunda.zeebe.engine.state.instance.EventTrigger;
import org.agrona.DirectBuffer;

public interface EventScopeInstanceState {

Expand All @@ -30,8 +31,11 @@ public interface EventScopeInstanceState {
EventTrigger peekEventTrigger(long eventScopeKey);

/**
* Checks if the event scope can be triggered for the given event.
*
* @param eventScopeKey the key of the event scope the event is triggered in
* @return true if the event can be accepted
* @param elementId the element id of the event that is triggered
* @return {@code true} if the event can be triggered, otherwise {@code false}
*/
boolean isAcceptingEvent(long eventScopeKey);
boolean canTriggerEvent(long eventScopeKey, final DirectBuffer elementId);
}

0 comments on commit 2f4d8c8

Please sign in to comment.