Skip to content

Commit

Permalink
merge: #9840 #9844
Browse files Browse the repository at this point in the history
9840: `TypedStreamWriter` that writes to buffer r=pihme a=pihme

## Description

- Implements `TypedStreamWriter` that writes to buffer
- Implements processing result builder to use the buffered stream writer
- does not add tests. I started it, but it would take too much time. Hope I can revisit it after my holidays. For now there is just a follow up issue: #9838

## Related issues

closes #9780 



9844: Shape legacy code into new interfaces good bits part 2 r=pihme a=pihme

## Description

- Move logic to select processor into engine
- Move processing logic into engine
- Use `ProcessingResult` to return response and execute side effects
- Move error handling logic into engine
- Fix tests

## Review Hints
- most of the commits have some failing tests
- this was unavoidable, because substeps in the modification were invalid/incomplete
- In the end, all tests are now passing again. This was quite satisfying acutally - adding more changes and seeing the test failure count go down again
- Overall I would say we are through the valley of pain. Was quite volatile recently, but I am confident we can bring some calmness back to development
- This achieves that most of the engine abstraction classes are in play; and the division of labor is largely as intended. 
- Still lots of stuff to do on either side, though

## Related issues

related to #9725



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Jul 20, 2022
3 parents ec0ac77 + e8fb77b + 1440ed5 commit b229050
Show file tree
Hide file tree
Showing 19 changed files with 512 additions and 125 deletions.
101 changes: 92 additions & 9 deletions engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,63 @@
*/
package io.camunda.zeebe.engine;

import io.camunda.zeebe.engine.api.EmptyProcessingResult;
import io.camunda.zeebe.engine.api.ErrorHandlingContext;
import io.camunda.zeebe.engine.api.ProcessingContext;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProcessorMap;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContextImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.state.processing.DbBlackListState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.ErrorIntent;
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRelated;
import org.slf4j.Logger;

public class Engine implements RecordProcessor {

private static final Logger LOG = Loggers.PROCESSOR_LOGGER;
private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT =
"Expected to find processor for record '{}', but caught an exception. Skip this record.";
private static final String PROCESSING_ERROR_MESSAGE =
"Expected to process record '%s' without errors, but exception occurred with message '%s'.";
private EventApplier eventApplier;
private RecordProcessorMap recordProcessorMap;
private MutableZeebeState zeebeState;
private TypedStreamWriter streamWriter;
private TypedResponseWriter responseWriter;

private final ErrorRecord errorRecord = new ErrorRecord();

private Writers writers;

public Engine() {}

@Override
public void init(final RecordProcessorContext recordProcessorContext) {
streamWriter = recordProcessorContext.getStreamWriterProxy();
responseWriter = recordProcessorContext.getTypedResponseWriter();

final var typedProcessorContext =
new TypedRecordProcessorContextImpl(
recordProcessorContext.getPartitionId(),
recordProcessorContext.getScheduleService(),
recordProcessorContext.getZeebeDb(),
recordProcessorContext.getTransactionContext(),
recordProcessorContext.getStreamWriterProxy(),
streamWriter,
recordProcessorContext.getEventApplierFactory(),
recordProcessorContext.getTypedResponseWriter());
responseWriter);

final TypedRecordProcessors typedRecordProcessors =
recordProcessorContext
Expand All @@ -45,12 +74,11 @@ public void init(final RecordProcessorContext recordProcessorContext) {
typedProcessorContext.getStreamProcessorListener());

recordProcessorContext.setLifecycleListeners(typedRecordProcessors.getLifecycleListeners());
final RecordProcessorMap recordProcessorMap = typedRecordProcessors.getRecordProcessorMap();

recordProcessorContext.setRecordProcessorMap(recordProcessorMap);
recordProcessorContext.setWriters(typedProcessorContext.getWriters());
recordProcessorMap = typedRecordProcessors.getRecordProcessorMap();

final var zeebeState = typedProcessorContext.getZeebeState();
writers = typedProcessorContext.getWriters();
recordProcessorContext.setWriters(writers);
zeebeState = typedProcessorContext.getZeebeState();
eventApplier = recordProcessorContext.getEventApplierFactory().apply(zeebeState);
}

Expand All @@ -62,14 +90,69 @@ public void replay(final TypedRecord event) {
@Override
public ProcessingResult process(
final TypedRecord record, final ProcessingContext processingContext) {
throw new IllegalStateException("Not yet implemented");
TypedRecordProcessor<?> currentProcessor = null;

final var typedCommand = (TypedRecord<?>) record;
try {
currentProcessor =
recordProcessorMap.get(
typedCommand.getRecordType(),
typedCommand.getValueType(),
typedCommand.getIntent().value());
} catch (final Exception e) {
LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, typedCommand, e);
}

if (currentProcessor == null) {
return EmptyProcessingResult.INSTANCE;
}

final var processingResultBuilder = processingContext.getProcessingResultBuilder();

final boolean isNotOnBlacklist = !zeebeState.getBlackListState().isOnBlacklist(typedCommand);
if (isNotOnBlacklist) {
final long position = typedCommand.getPosition();
currentProcessor.processRecord(
position,
record,
responseWriter,
streamWriter,
(sep) -> {
processingResultBuilder.resetPostCommitTasks();
processingResultBuilder.appendPostCommitTask(sep::flush);
});
}

return processingResultBuilder.build();
}

@Override
public ProcessingResult onProcessingError(
final Throwable processingException,
final TypedRecord record,
final ErrorHandlingContext errorHandlingContext) {
throw new IllegalStateException("Not yet implemented");

final String errorMessage =
String.format(PROCESSING_ERROR_MESSAGE, record, processingException.getMessage());
LOG.error(errorMessage, processingException);

final var processingResultBuilder = errorHandlingContext.getProcessingResultBuilder();

writers.rejection().appendRejection(record, RejectionType.PROCESSING_ERROR, errorMessage);
writers
.response()
.writeRejectionOnCommand(record, RejectionType.PROCESSING_ERROR, errorMessage);
errorRecord.initErrorRecord(processingException, record.getPosition());

if (DbBlackListState.shouldBeBlacklisted(record.getIntent())) {
if (record.getValue() instanceof ProcessInstanceRelated) {
final long processInstanceKey =
((ProcessInstanceRelated) record.getValue()).getProcessInstanceKey();
errorRecord.setProcessInstanceKey(processInstanceKey);
}

writers.state().appendFollowUpEvent(record.getKey(), ErrorIntent.CREATED, errorRecord);
}
return processingResultBuilder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;

public final class EmptyProcessingResult implements ProcessingResult {

public static final ProcessingResult INSTANCE = new EmptyProcessingResult();

private EmptyProcessingResult() {}

@Override
public long writeRecordsToStream(final LogStreamBatchWriter logStreamBatchWriter) {
return 0;
}

@Override
public boolean writeResponse(final CommandResponseWriter commandResponseWriter) {
return true;
}

@Override
public boolean executePostCommitTasks() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,12 @@ ProcessingResultBuilder withResponse(
*/
ProcessingResultBuilder reset();

/**
* Resets itself with the post commit tasks reset
*
* @return itself for method chaining
*/
ProcessingResultBuilder resetPostCommitTasks();

ProcessingResult build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProcessorMap;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
Expand Down Expand Up @@ -43,9 +42,6 @@ public interface RecordProcessorContext {
@Deprecated // will most likely be moved into engine
void setLifecycleListeners(List<StreamProcessorLifecycleAware> lifecycleListeners);

@Deprecated // will be moved into engine
void setRecordProcessorMap(RecordProcessorMap recordProcessorMap);

void setStreamProcessorListener(StreamProcessorListener streamProcessorListener);

@Deprecated // will most likely be moved into engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@ private void sendResult(
ValueType.PROCESS_INSTANCE_RESULT,
requestMetadata.getRequestId(),
requestMetadata.getRequestStreamId());

responseWriter.flush();
}

private DirectBuffer collectVariables(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ public void processRecord(
final TypedStreamWriter streamWriter,
final Consumer<SideEffectProducer> sideEffect) {

// need to add multiple side-effects for sending a response and scheduling timers
sideEffects.add(responseWriter);
sideEffect.accept(sideEffects);

final DeploymentRecord deploymentEvent = command.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void processRecord(
responseWriter.writeEventOnCommand(key, IncidentIntent.RESOLVED, incident, command);

// if it fails, a new incident is raised
attemptToContinueProcessProcessing(command, responseWriter, streamWriter, sideEffect, incident);
attemptToContinueProcessProcessing(command, streamWriter, sideEffect, incident);
}

private void rejectResolveCommand(
Expand All @@ -96,7 +96,6 @@ private void rejectResolveCommand(

private void attemptToContinueProcessProcessing(
final TypedRecord<IncidentRecord> command,
final TypedResponseWriter responseWriter,
final TypedStreamWriter streamWriter,
final Consumer<SideEffectProducer> sideEffect,
final IncidentRecord incident) {
Expand All @@ -111,7 +110,6 @@ private void attemptToContinueProcessProcessing(
.ifRightOrLeft(
failedCommand -> {
sideEffects.clear();
sideEffects.add(responseWriter::flush);

bpmnStreamProcessor.processRecord(
failedCommand, noopResponseWriter, streamWriter, sideEffects::add);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public void processRecord(

sideEffect.accept(sideEffectQueue);
sideEffectQueue.clear();
sideEffectQueue.add(responseWriter::flush);

final boolean shouldRespond = wrappedProcessor.onCommand(command, this, sideEffectQueue::add);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void blacklistProcessInstance(final long processInstanceKey) {
blacklist(processInstanceKey);
}

private boolean shouldBeBlacklisted(final Intent intent) {
public static boolean shouldBeBlacklisted(final Intent intent) {

if (intent instanceof ProcessInstanceRelatedIntent) {
final ProcessInstanceRelatedIntent processInstanceRelatedIntent =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor;

import static io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry.EVENT_REGISTRY;

import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BinaryOperator;

/** Implementation of {@code ProcessingResultBuilder} that writes all data into a buffer */
final class BufferedProcessingResultBuilder implements ProcessingResultBuilder {

private final Map<Class<? extends UnpackedObject>, ValueType> typeRegistry;

private final BufferedStreamWriter bufferedStreamWriter;
private final List<PostCommitTask> postCommitTasks = new ArrayList<>();
private final int sourceIndex;

BufferedProcessingResultBuilder(
final BinaryOperator<Integer> capacityCalculator, final int sourceIndex) {
bufferedStreamWriter = new BufferedStreamWriter(capacityCalculator);
this.sourceIndex = sourceIndex;

typeRegistry = new HashMap<>();
EVENT_REGISTRY.forEach((e, c) -> typeRegistry.put(c, e));
}

@Override
public ProcessingResultBuilder appendRecord(
final long key,
final RecordType type,
final Intent intent,
final RejectionType rejectionType,
final String rejectionReason,
final RecordValue value) {

final ValueType valueType = initValueType(value);
final var valueWriter = initValueWriter(value);

bufferedStreamWriter.appendRecord(
key, sourceIndex, type, intent, rejectionType, rejectionReason, valueType, valueWriter);
return this;
}

@Override
public ProcessingResultBuilder withResponse(
final long eventKey,
final Intent eventState,
final UnpackedObject eventValue,
final ValueType valueType,
final long requestId,
final int requestStreamId) {
throw new RuntimeException("Not yet implemented");
}

@Override
public ProcessingResultBuilder appendPostCommitTask(final PostCommitTask r) {
postCommitTasks.add(r);
return this;
}

@Override
public ProcessingResultBuilder reset() {
bufferedStreamWriter.reset();
postCommitTasks.clear();
return this;
}

@Override
public ProcessingResult build() {
throw new RuntimeException("Not yet implemented");
}

private ValueType initValueType(final RecordValue value) {
final ValueType valueType = typeRegistry.get(value.getClass());
if (valueType == null) {
// usually happens when the record is not registered at the TypedStreamEnvironment
throw new IllegalArgumentException(
"Missing value type mapping for record: " + value.getClass());
}
return valueType;
}

private BufferWriter initValueWriter(final RecordValue value) {
// TODO evaluate whether the interface should be changed to UnifiedRecordValue or <T extends
// RecordValue & BufferWriter> BufferWriter initValueWriter(final T value) {}
// validation
if (!(value instanceof BufferWriter)) {
throw new IllegalArgumentException(
String.format("The record value %s is not a BufferWriter", value));
}

return (BufferWriter) value;
}
}

0 comments on commit b229050

Please sign in to comment.