Skip to content

Commit

Permalink
merge: #9853
Browse files Browse the repository at this point in the history
9853: Engine abstraction - rewrite Writers class to use ResultProcessorBuilder provided by platform r=pihme a=pihme

## Description

This is the last PR in the "shape legacy code into new interfaces" series.

Previously, the new interfaces `ProcessingResultBuilder` and `ProcessingResult` were introduced. This is how the engine is supposed to return the processing result to the stream processor. Also previously, these classes were passed between stream processor and engine on interface level, and the division of labor between stream processor and engine was established.

What was not achieved yet, is that the engine uses these new classes. Instead, the engine used existing legacy code to circumvent the new interfaces. 

This PR transforms the engine to use the new classes provided by the stream processor:
- Central point of transformation is the `Writers` class which at the outset depended on stream writers, state writers, response writers and so on
- In the end, the `Writers` class only depends on `ProcessingResultBuilder` which will be swapped out for each record processed, or each error to be handled
- To make this transition possible we a) need to refine the `Typed(Response/Rejection/Command)Writer` classes and remove methods like `reset, flush` which will no longer be offered by the new interfaces and b) need to implement the refined interfaces with an implementation that forwards calls to `ProcessingResultBuilder`

## Review hints
- `@npepinpe` 95% of changes is in the engine, `@remcowesterhoud` will look into those
- These changes are done under "better done than perfect" regime. Some naming might be not exactly right. Some clean up might be done after the fact. The focus is to establish much of the engine abstraction concept, so that both teams afterwards can work without interfering with each other.

## Related issues

related to #9725



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Jul 22, 2022
2 parents 52ee629 + 5d26a75 commit 6efb06b
Show file tree
Hide file tree
Showing 70 changed files with 903 additions and 409 deletions.
154 changes: 98 additions & 56 deletions engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContextImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.processing.DbBlackListState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.ErrorIntent;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRelated;
import java.util.function.Supplier;
import org.slf4j.Logger;

public class Engine implements RecordProcessor<EngineContext> {
Expand All @@ -38,12 +39,15 @@ public class Engine implements RecordProcessor<EngineContext> {
"Expected to process record '%s' without errors, but exception occurred with message '%s'.";
private EventApplier eventApplier;
private RecordProcessorMap recordProcessorMap;
private MutableZeebeState zeebeState;
private TypedStreamWriter streamWriter;
private TypedResponseWriter responseWriter;
private ZeebeDbState zeebeState;
private LegacyTypedStreamWriter streamWriter;
private LegacyTypedResponseWriter responseWriter;

private final ErrorRecord errorRecord = new ErrorRecord();

private final ProcessingResultBuilderMutex resultBuilderMutex =
new ProcessingResultBuilderMutex();

private Writers writers;

public Engine() {}
Expand All @@ -53,15 +57,21 @@ public void init(final EngineContext engineContext) {
streamWriter = engineContext.getStreamWriterProxy();
responseWriter = engineContext.getTypedResponseWriter();

zeebeState =
new ZeebeDbState(
engineContext.getPartitionId(),
engineContext.getZeebeDb(),
engineContext.getTransactionContext());
eventApplier = engineContext.getEventApplierFactory().apply(zeebeState);

writers = new Writers(resultBuilderMutex, eventApplier);

final var typedProcessorContext =
new TypedRecordProcessorContextImpl(
engineContext.getPartitionId(),
engineContext.getScheduleService(),
engineContext.getZeebeDb(),
engineContext.getTransactionContext(),
streamWriter,
engineContext.getEventApplierFactory(),
responseWriter);
zeebeState,
writers);

final TypedRecordProcessors typedRecordProcessors =
engineContext.getTypedRecordProcessorFactory().createProcessors(typedProcessorContext);
Expand All @@ -71,10 +81,7 @@ public void init(final EngineContext engineContext) {
engineContext.setLifecycleListeners(typedRecordProcessors.getLifecycleListeners());
recordProcessorMap = typedRecordProcessors.getRecordProcessorMap();

writers = typedProcessorContext.getWriters();
engineContext.setWriters(writers);
zeebeState = typedProcessorContext.getZeebeState();
eventApplier = engineContext.getEventApplierFactory().apply(zeebeState);
}

@Override
Expand All @@ -85,37 +92,38 @@ public void replay(final TypedRecord event) {
@Override
public ProcessingResult process(
final TypedRecord record, final ProcessingResultBuilder processingResultBuilder) {
TypedRecordProcessor<?> currentProcessor = null;

final var typedCommand = (TypedRecord<?>) record;
try {
currentProcessor =
recordProcessorMap.get(
typedCommand.getRecordType(),
typedCommand.getValueType(),
typedCommand.getIntent().value());
} catch (final Exception e) {
LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, typedCommand, e);
}
try (final var scope = new ProcessingResultBuilderScope(processingResultBuilder)) {
TypedRecordProcessor<?> currentProcessor = null;

final var typedCommand = (TypedRecord<?>) record;
try {
currentProcessor =
recordProcessorMap.get(
typedCommand.getRecordType(),
typedCommand.getValueType(),
typedCommand.getIntent().value());
} catch (final Exception e) {
LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, typedCommand, e);
}

if (currentProcessor == null) {
return EmptyProcessingResult.INSTANCE;
}
if (currentProcessor == null) {
return EmptyProcessingResult.INSTANCE;
}

final boolean isNotOnBlacklist = !zeebeState.getBlackListState().isOnBlacklist(typedCommand);
if (isNotOnBlacklist) {
final long position = typedCommand.getPosition();
currentProcessor.processRecord(
position,
record,
responseWriter,
streamWriter,
(sep) -> {
processingResultBuilder.resetPostCommitTasks();
processingResultBuilder.appendPostCommitTask(sep::flush);
});
final boolean isNotOnBlacklist = !zeebeState.getBlackListState().isOnBlacklist(typedCommand);
if (isNotOnBlacklist) {
final long position = typedCommand.getPosition();
currentProcessor.processRecord(
position,
record,
responseWriter,
streamWriter,
(sep) -> {
processingResultBuilder.resetPostCommitTasks();
processingResultBuilder.appendPostCommitTask(sep::flush);
});
}
}

return processingResultBuilder.build();
}

Expand All @@ -124,26 +132,60 @@ public ProcessingResult onProcessingError(
final Throwable processingException,
final TypedRecord record,
final ProcessingResultBuilder processingResultBuilder) {

final String errorMessage =
String.format(PROCESSING_ERROR_MESSAGE, record, processingException.getMessage());
LOG.error(errorMessage, processingException);

writers.rejection().appendRejection(record, RejectionType.PROCESSING_ERROR, errorMessage);
writers
.response()
.writeRejectionOnCommand(record, RejectionType.PROCESSING_ERROR, errorMessage);
errorRecord.initErrorRecord(processingException, record.getPosition());

if (DbBlackListState.shouldBeBlacklisted(record.getIntent())) {
if (record.getValue() instanceof ProcessInstanceRelated) {
final long processInstanceKey =
((ProcessInstanceRelated) record.getValue()).getProcessInstanceKey();
errorRecord.setProcessInstanceKey(processInstanceKey);
try (final var scope = new ProcessingResultBuilderScope(processingResultBuilder)) {
writers.rejection().appendRejection(record, RejectionType.PROCESSING_ERROR, errorMessage);
writers
.response()
.writeRejectionOnCommand(record, RejectionType.PROCESSING_ERROR, errorMessage);
errorRecord.initErrorRecord(processingException, record.getPosition());

if (DbBlackListState.shouldBeBlacklisted(record.getIntent())) {
if (record.getValue() instanceof ProcessInstanceRelated) {
final long processInstanceKey =
((ProcessInstanceRelated) record.getValue()).getProcessInstanceKey();
errorRecord.setProcessInstanceKey(processInstanceKey);
}

writers.state().appendFollowUpEvent(record.getKey(), ErrorIntent.CREATED, errorRecord);
}

writers.state().appendFollowUpEvent(record.getKey(), ErrorIntent.CREATED, errorRecord);
}
return processingResultBuilder.build();
}

private static final class ProcessingResultBuilderMutex
implements Supplier<ProcessingResultBuilder> {

private ProcessingResultBuilder resultBuilder;

private void setResultBuilder(final ProcessingResultBuilder resultBuilder) {
this.resultBuilder = resultBuilder;
}

private void unsetResultBuilder() {
/* TODO think about what we want to do here. Right now it is rest to null, which means NPEs
if accessed outside scope. We could also set a NOOP implementation, or one that logs warnings, etc.*/
resultBuilder = null;
}

@Override
public ProcessingResultBuilder get() {
return resultBuilder;
}
}

private final class ProcessingResultBuilderScope implements AutoCloseable {

private ProcessingResultBuilderScope(final ProcessingResultBuilder processingResultBuilder) {
resultBuilderMutex.setResultBuilder(processingResultBuilder);
}

@Override
public void close() {
resultBuilderMutex.unsetResultBuilder();
}
}
}
16 changes: 8 additions & 8 deletions engine/src/main/java/io/camunda/zeebe/engine/EngineContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
Expand All @@ -28,8 +28,8 @@ public final class EngineContext {
private final ProcessingScheduleService scheduleService;
private final ZeebeDb zeebeDb;
private final TransactionContext transactionContext;
private final TypedStreamWriter streamWriter;
private final TypedResponseWriter responseWriter;
private final LegacyTypedStreamWriter streamWriter;
private final LegacyTypedResponseWriter responseWriter;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final TypedRecordProcessorFactory typedRecordProcessorFactory;
private List<StreamProcessorLifecycleAware> lifecycleListeners = Collections.EMPTY_LIST;
Expand All @@ -41,8 +41,8 @@ public EngineContext(
final ProcessingScheduleService scheduleService,
final ZeebeDb zeebeDb,
final TransactionContext transactionContext,
final TypedStreamWriter streamWriter,
final TypedResponseWriter responseWriter,
final LegacyTypedStreamWriter streamWriter,
final LegacyTypedResponseWriter responseWriter,
final Function<MutableZeebeState, EventApplier> eventApplierFactory,
final TypedRecordProcessorFactory typedRecordProcessorFactory) {
this.partitionId = partitionId;
Expand Down Expand Up @@ -71,11 +71,11 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}

public TypedStreamWriter getStreamWriterProxy() {
public LegacyTypedStreamWriter getStreamWriterProxy() {
return streamWriter;
}

public TypedResponseWriter getTypedResponseWriter() {
public LegacyTypedResponseWriter getTypedResponseWriter() {
return responseWriter;
}

Expand Down
17 changes: 17 additions & 0 deletions engine/src/main/java/io/camunda/zeebe/engine/api/LegacyTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedCommandWriter;

/** This interface is here to migrate legacy tasks */
@Deprecated
public interface LegacyTask {

void run(LegacyTypedCommandWriter commandWriter, ProcessingScheduleService schedulingService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ ProcessingResultBuilder appendRecord(
* @return returns itself for method chaining
*/
ProcessingResultBuilder withResponse(
final long eventKey,
final Intent eventState,
final UnpackedObject eventValue,
final RecordType type,
final long key,
final Intent intent,
final UnpackedObject value,
final ValueType valueType,
final RejectionType rejectionType,
final String rejectionReason,
final long requestId,
final int requestStreamId);

Expand Down Expand Up @@ -67,4 +70,8 @@ ProcessingResultBuilder withResponse(
ProcessingResultBuilder resetPostCommitTasks();

ProcessingResult build();

boolean canWriteEventOfLength(int eventLength);

int getMaxEventLength();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ public interface ProcessingScheduleService {

<T> void runOnCompletion(ActorFuture<T> precedingTask, BiConsumer<T, Throwable> followUpTask);

@Deprecated
<T> void runOnCompletion(ActorFuture<T> precedingTask, LegacyTask followUpTask);

@Deprecated
void runDelayed(Duration delay, LegacyTask followUpTask);

default void runAtFixedRate(final Duration delay, final Runnable task) {
runDelayed(
delay,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
Expand All @@ -24,7 +24,7 @@ public interface ReadonlyStreamProcessorContext {
/**
* @return the actual log stream writer, used to write any record
*/
TypedStreamWriter getLogStreamWriter();
LegacyTypedStreamWriter getLogStreamWriter();

/**
* @return the specific writers, like command, response, etc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.processing.variable.VariableBehavior;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
Expand Down Expand Up @@ -88,8 +88,8 @@ private BpmnElementContainerProcessor<ExecutableFlowElement> getContainerProcess
@Override
public void processRecord(
final TypedRecord<ProcessInstanceRecord> record,
final TypedResponseWriter responseWriter,
final TypedStreamWriter streamWriter,
final LegacyTypedResponseWriter responseWriter,
final LegacyTypedStreamWriter streamWriter,
final Consumer<SideEffectProducer> sideEffect) {

// initialize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ public BpmnBehaviorsImpl(
stateBehavior,
processEngineMetrics,
processorLookup,
writers,
zeebeState.getElementInstanceState());
writers);
eventSubscriptionBehavior =
new BpmnEventSubscriptionBehavior(
catchEventBehavior,
Expand Down

0 comments on commit 6efb06b

Please sign in to comment.