Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
9753: Document events and event scopes r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->
This provides an initial description of events and event scopes. It uses both the BPMN specification as well as the terminology used in Zeebe to describe events, triggers, and event scopes.

## Related issues

<!-- Which issues are closed by this PR or are related -->

NA


10021: Engine abstraction code cleanup r=pihme a=pihme

## Description
* Removes unnecessary references
* Moves `LegacyTypedResponseWriter` to stream processor, as it is only used in this package
* Renames `LegacyTypedResponseWriter`

## Related issues

related to #9727 



10028: Revert "Merge pull request #9985 from camunda/zell-clean-up-abstracti… r=Zelldon a=Zelldon

## Description

This reverts commit 155de10, reversing
changes made to 47c3d3a.
<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #10027



Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
Co-authored-by: pihme <pihme@users.noreply.github.com>
Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
4 people committed Aug 8, 2022
4 parents 155de10 + e0fa79d + 349bb3b + 7a8db74 commit 26c8cca
Show file tree
Hide file tree
Showing 18 changed files with 154 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void setup() {

private RecordProcessorContextImpl createContext(
final ProcessingScheduleService executor, final ZeebeDb zeebeDb) {
return new RecordProcessorContextImpl(1, executor, zeebeDb, zeebeDb.createContext());
return new RecordProcessorContextImpl(1, executor, zeebeDb, zeebeDb.createContext(), null);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
Expand Down Expand Up @@ -128,6 +129,7 @@ private static StreamProcessor createStreamProcessor(
.actorSchedulingService(context.getActorSchedulingService())
.zeebeDb(context.getZeebeDb())
.recordProcessor(new Engine(context.getTypedRecordProcessorFactory()))
.eventApplierFactory(EventAppliers::new)
.nodeId(context.getNodeId())
.commandResponseWriter(context.getCommandResponseWriter())
.listener(processedCommand -> context.getOnProcessedListener().accept(processedCommand))
Expand Down
84 changes: 84 additions & 0 deletions docs/events.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Events

This is a description of events and event scopes. It combines a summary of the BPMN 2.0 specification with how they work in Zeebe.

In BPMN, **events** are flow nodes that represent that something can happen that affects the flow of the process.

There are three main types of events:
- **start events**, i.e. a start of a process
- **end events**, i.e. the end of a path in a process
- **intermediate events**, i.e. indicates where something can happen between the start and end of a process; intermediate events attached to an activity boundary are called boundary events

and two different flavors of events:
- **catch events**, i.e. events that catch a trigger
- **throw events**, i.e. events that throw a trigger (sometimes referred to as a result)

See [BPMN (v2.0.2): 10.5 Events](https://www.omg.org/spec/BPMN/2.0.2/PDF#10.5%20Events) for more details about events.

## Triggers

So events represent where in the process "something" can happen. This "something" is called a **trigger**. When a *trigger occurs*, the engine *forwards the trigger* to the catch event. The spec also refers to this as *triggering the catch event*.

According to the spec, there are multiple ways to **forward a trigger**:
- publication, e.g. when a message is published it can be correlated to an event
- direct resolution, e.g. timer triggers are implicitly thrown
- propagation, i.e. forwarded from the location where the event has been thrown to the innermost enclosing scope instance (we'll discuss scopes later) with an attached event able to catch the trigger, e.g. throwing and catching a bpmn error
- cancellation, e.g. a termination of the process instance (not handled as an event trigger in Zeebe)
- compensation, not yet available in zeebe
- (Zeebe also considers job completion as a trigger that is forwarded to the respective job worker task, but this is not part of the BPMN spec)

When a trigger is forwarded to a catch event, it has some effect in the process. Depending on the type, the catch event is activated (e.g. start event) or completed (if already active, e.g. non-boundary intermediate event) and the process execution can continue.

In addition, some catch events that had a trigger forwarded can **interrupt** an active flow. An interrupted active flow is *terminated*. Catch events will only interrupt the active flow if they are *interrupting*. Catch events that are *non-interrupting* won't interrupt the active flow. The following catch events can be *interrupting* or *non-interrupting*, all others are *non-interrupting*:
- start event of event sub-processes, i.e. when triggered it interrupts the (sub-)process encompassing the event sub-process
- boundary event, i.e. when triggered it interrupts the activity to which it is attached

See [BPMN (v2.0.2): 10.5.1 Concepts](https://www.omg.org/spec/BPMN/2.0.2/PDF#10.5.1%20Concepts) for more details about triggers.

See [BPMN (v2.0.2): 10.5.6 Handling Events](https://www.omg.org/spec/BPMN/2.0.2/PDF#10.5.6%20Handling%20Events) for more details on what should happen when a trigger is forwarded to a catch event.

## Scopes

**Scopes** are used to define the semantics of:
- visibility of data, i.e. an activity's visibility of process variables
- event resolution, e.g. error propagation through the enclosing scopes
- starting/stopping of token execution

See [BPMN (v2.0.2): 10.5.7 Scopes](https://www.omg.org/spec/BPMN/2.0.2/PDF#10.5.7%20Scopes) for more details about scopes.

In Zeebe, we refer to scopes in multiple forms (these are not described by the BPMN spec):
- **flow scopes**, i.e. the element encompassing a specific element, aka parent-child relation; e.g. a sub-process is the flow scope of a service task when it directly encompasses this service task. Note, sometimes the indirect encompassing element (non-parent ancestor) is also referred to as flow scope.
- event scopes, described below

### Event scopes

The **event scope** refers to the element in the process that must be *active* in order for a *trigger* to be forwarded to the catch event. For example, we can only forward the trigger to a boundary event if its attached element is *active*. Or in other words, the boundary event is in **scope** when the attached element is *active*.

- For a boundary event, the event scope refers to the element where it is attached to.
- For the start event in an event subprocess, the event scope refers to the flow scope that contains the event subprocess (i.e. the process or the embedded subprocess).
- For an event connected to an event-based gateway, the event scope refers to the event-based gateway.
- For elements that are actively waiting for events (e.g. intermediate catch events), the event scope refers to the element itself.

In Zeebe, we sometimes say that an event scope can be triggered, meaning: a trigger can be forwarded to a catch event.

Event scopes are not entities in Zeebe. Instead, Zeebe only cares about event scope instances.

### Event scope instances

An **event scope instance** is an instance of an event scope. It directly refers to a specific element instance, because it is stored in the state (in the event scope column family) under the key of that element instance. This means, we can directly access the event scope for an element instance. We'll see later what we use this for.

:::info
Event scope instances are persisted in the state (as `EventScopeInstance`) but not represented on the log stream. In contrast, event triggers are persisted along with the event scope instance in the state, and are represented on the log stream as `ProcessEvent` records.
:::

The engine uses event scope instances:
- ~~to find the relevant catch event when a trigger occurs~~ - It would be reasonable to expect that the engine uses the event scope instance to find the relevant catch event for a trigger, but that is not the case. We'll discuss how this works later.
- to determine whether the trigger can be forwarded to the catch event, e.g. boundary events attached to an activity can no longer be triggered when the activity is already interrupted by an attached boundary event

An event scope can be triggered if no interrupting event was triggered (i.e. it is not interrupted). If an interrupting catch event was triggered then no other event can be triggered, except for boundary events. If an interrupting boundary event was triggered then no other events, including boundary events, can be triggered, i.e. it is not **accepting** any events.

An event scope instance has 4 properties:
- `accepting`, when `false` the event scope instance is not accepting any triggers.
- `interrupted`, when `true` it is interrupted, but may still be accepting events for boundary events.
- `interruptingElementIds`, the element IDs of the catch events that can interrupt the event scope instance. This property doesn't change during the event scope instance's lifetime.
- `boundaryElementIds`, the element IDs of the boundary events that are attached to the event scope instance. This property doesn't change during the event scope instance's lifetime.
3 changes: 1 addition & 2 deletions engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.state.processing.DbBlackListState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
Expand Down Expand Up @@ -64,7 +63,7 @@ public void init(final RecordProcessorContext recordProcessorContext) {
recordProcessorContext.getPartitionId(),
recordProcessorContext.getZeebeDb(),
recordProcessorContext.getTransactionContext());
eventApplier = new EventAppliers(zeebeState);
eventApplier = recordProcessorContext.getEventApplierFactory().apply(zeebeState);

writers = new Writers(resultBuilderMutex, eventApplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.List;
import java.util.function.Function;

public interface RecordProcessorContext {

Expand All @@ -22,6 +25,8 @@ public interface RecordProcessorContext {

TransactionContext getTransactionContext();

Function<MutableZeebeState, EventApplier> getEventApplierFactory();

List<StreamProcessorLifecycleAware> getLifecycleListeners();

StreamProcessorListener getStreamProcessorListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,7 @@ public static TypedRecordProcessors createEngineProcessors(
jobMetrics,
eventTriggerBehavior);

addIncidentProcessors(
zeebeState,
bpmnStreamProcessor,
typedRecordProcessors,
writers,
zeebeState.getKeyGenerator());
addIncidentProcessors(zeebeState, bpmnStreamProcessor, typedRecordProcessors, writers);

return typedRecordProcessors;
}
Expand Down Expand Up @@ -213,10 +208,9 @@ private static void addIncidentProcessors(
final ZeebeState zeebeState,
final TypedRecordProcessor<ProcessInstanceRecord> bpmnStreamProcessor,
final TypedRecordProcessors typedRecordProcessors,
final Writers writers,
final KeyGenerator keyGenerator) {
final Writers writers) {
IncidentEventProcessors.addProcessors(
typedRecordProcessors, zeebeState, bpmnStreamProcessor, writers, keyGenerator);
typedRecordProcessors, zeebeState, bpmnStreamProcessor, writers);
}

private static void addMessageProcessors(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.ValueType;
Expand All @@ -22,11 +21,10 @@ public static void addProcessors(
final TypedRecordProcessors typedRecordProcessors,
final ZeebeState zeebeState,
final TypedRecordProcessor<ProcessInstanceRecord> bpmnStreamProcessor,
final Writers writers,
final KeyGenerator keyGenerator) {
final Writers writers) {
typedRecordProcessors.onCommand(
ValueType.INCIDENT,
IncidentIntent.RESOLVE,
new ResolveIncidentProcessor(zeebeState, bpmnStreamProcessor, writers, keyGenerator));
new ResolveIncidentProcessor(zeebeState, bpmnStreamProcessor, writers));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.NoopResponseWriterLegacy;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.ElementInstanceState;
import io.camunda.zeebe.engine.state.immutable.IncidentState;
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
Expand All @@ -39,29 +36,25 @@ public final class ResolveIncidentProcessor implements TypedRecordProcessor<Inci

private final ProcessInstanceRecord failedRecord = new ProcessInstanceRecord();
private final SideEffectQueue sideEffects = new SideEffectQueue();
private final LegacyTypedResponseWriter noopResponseWriter = new NoopResponseWriterLegacy();

private final TypedRecordProcessor<ProcessInstanceRecord> bpmnStreamProcessor;
private final StateWriter stateWriter;
private final TypedRejectionWriter rejectionWriter;

private final IncidentState incidentState;
private final ElementInstanceState elementInstanceState;
private final KeyGenerator keyGenerator;
private final TypedResponseWriter responseWriter;

public ResolveIncidentProcessor(
final ZeebeState zeebeState,
final TypedRecordProcessor<ProcessInstanceRecord> bpmnStreamProcessor,
final Writers writers,
final KeyGenerator keyGenerator) {
final Writers writers) {
this.bpmnStreamProcessor = bpmnStreamProcessor;
stateWriter = writers.state();
rejectionWriter = writers.rejection();
responseWriter = writers.response();
incidentState = zeebeState.getIncidentState();
elementInstanceState = zeebeState.getElementInstanceState();
this.keyGenerator = keyGenerator;
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import java.util.ArrayList;
Expand All @@ -27,7 +26,7 @@ final class DirectProcessingResult implements ProcessingResult {
private final List<PostCommitTask> postCommitTasks;

private final LegacyTypedStreamWriter streamWriter;
private final LegacyTypedResponseWriter responseWriter;
private final DirectTypedResponseWriter responseWriter;
private boolean hasResponse;

DirectProcessingResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -33,7 +32,7 @@ final class DirectProcessingResultBuilder implements ProcessingResultBuilder {

private final StreamProcessorContext context;
private final LegacyTypedStreamWriter streamWriter;
private final LegacyTypedResponseWriter responseWriter;
private final DirectTypedResponseWriter responseWriter;

private boolean hasResponse =
true; // TODO figure out why this still needs to be true for tests to pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor.writers;
package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;

public interface LegacyTypedResponseWriter extends SideEffectProducer, TypedResponseWriter {
public interface DirectTypedResponseWriter extends SideEffectProducer, TypedResponseWriter {

@Override
void writeRejectionOnCommand(TypedRecord<?> command, RejectionType type, String reason);
Expand Down

0 comments on commit 26c8cca

Please sign in to comment.