Skip to content

Commit

Permalink
merge: #10018
Browse files Browse the repository at this point in the history
10018: Try to handle expected error in record processor r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->

This PR offers a solution for
- #9420 

The idea is to pass the error to the specific `TypedRecordProcessor`, which can decide whether the error is `EXPECTED` or `UNEXPECTED`. 
- If the error is `UNEXPECTED`, we handle it as before by writing an Error event, etc.
- If the error is `EXPECTED`, we let the processor control how we reject the command and what else has to happen.

This PR shows how this could work by also providing a solution for
- #9644 

A test is added to verify the correct behavior: custom rejection + transaction rolled back. The test showed that a hidden bug was introduced (I managed to bisect this to 463366f): the output was reset on error, but no longer when encapsulated by the ProcessingResultBuilder. A fix for this bug is also part of this PR.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #9420
closes #9644



Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and korthout committed Aug 9, 2022
2 parents c5a0870 + 4018e94 commit 80dc320
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 30 deletions.
67 changes: 47 additions & 20 deletions engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProcessorMap;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor.ProcessingError;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContextImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
Expand All @@ -34,9 +35,9 @@
public class Engine implements RecordProcessor {

private static final Logger LOG = Loggers.PROCESSOR_LOGGER;
private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT =
private static final String ERROR_MESSAGE_PROCESSOR_NOT_FOUND =
"Expected to find processor for record '{}', but caught an exception. Skip this record.";
private static final String PROCESSING_ERROR_MESSAGE =
private static final String ERROR_MESSAGE_PROCESSING_EXCEPTION_OCCURRED =
"Expected to process record '%s' without errors, but exception occurred with message '%s'.";
private EventApplier eventApplier;
private RecordProcessorMap recordProcessorMap;
Expand Down Expand Up @@ -103,7 +104,7 @@ public ProcessingResult process(
typedCommand.getValueType(),
typedCommand.getIntent().value());
} catch (final Exception e) {
LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, typedCommand, e);
LOG.error(ERROR_MESSAGE_PROCESSOR_NOT_FOUND, typedCommand, e);
}

if (currentProcessor == null) {
Expand All @@ -128,28 +129,54 @@ public ProcessingResult onProcessingError(
final Throwable processingException,
final TypedRecord record,
final ProcessingResultBuilder processingResultBuilder) {
try (final var scope = new ProcessingResultBuilderScope(processingResultBuilder)) {

final var typedCommand = (TypedRecord<?>) record;
TypedRecordProcessor<?> processor = null;
try {
processor =
recordProcessorMap.get(
typedCommand.getRecordType(),
typedCommand.getValueType(),
typedCommand.getIntent().value());
} catch (final Exception e) {
LOG.error(ERROR_MESSAGE_PROCESSOR_NOT_FOUND, typedCommand, e);
}

final var error =
processor == null
? ProcessingError.UNEXPECTED_ERROR
: processor.tryHandleError(record, processingException);

if (error == ProcessingError.UNEXPECTED_ERROR) {
handleUnexpectedError(processingException, record);
}
}
return processingResultBuilder.build();
}

private void handleUnexpectedError(
final Throwable processingException, final TypedRecord record) {
final String errorMessage =
String.format(PROCESSING_ERROR_MESSAGE, record, processingException.getMessage());
String.format(
ERROR_MESSAGE_PROCESSING_EXCEPTION_OCCURRED, record, processingException.getMessage());
LOG.error(errorMessage, processingException);

try (final var scope = new ProcessingResultBuilderScope(processingResultBuilder)) {
writers.rejection().appendRejection(record, RejectionType.PROCESSING_ERROR, errorMessage);
writers
.response()
.writeRejectionOnCommand(record, RejectionType.PROCESSING_ERROR, errorMessage);
errorRecord.initErrorRecord(processingException, record.getPosition());

if (DbBlackListState.shouldBeBlacklisted(record.getIntent())) {
if (record.getValue() instanceof ProcessInstanceRelated) {
final long processInstanceKey =
((ProcessInstanceRelated) record.getValue()).getProcessInstanceKey();
errorRecord.setProcessInstanceKey(processInstanceKey);
}

writers.state().appendFollowUpEvent(record.getKey(), ErrorIntent.CREATED, errorRecord);
writers.rejection().appendRejection(record, RejectionType.PROCESSING_ERROR, errorMessage);
writers
.response()
.writeRejectionOnCommand(record, RejectionType.PROCESSING_ERROR, errorMessage);
errorRecord.initErrorRecord(processingException, record.getPosition());

if (DbBlackListState.shouldBeBlacklisted(record.getIntent())) {
if (record.getValue() instanceof ProcessInstanceRelated) {
final long processInstanceKey =
((ProcessInstanceRelated) record.getValue()).getProcessInstanceKey();
errorRecord.setProcessInstanceKey(processInstanceKey);
}

writers.state().appendFollowUpEvent(record.getKey(), ErrorIntent.CREATED, errorRecord);
}
return processingResultBuilder.build();
}

private static final class ProcessingResultBuilderMutex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,14 @@ public interface RecordProcessor {
void replay(TypedRecord record);

/**
* Called by platform to process a single record
* Called by platform to process a single record.
*
* <p><em>Contract</em> * *
* <p><em>Contract</em>
*
* <ul>
* *
* <li>Record will be a command
* <li>Will be called after replay is called
* <li>Implementors can write to the database. Transaction is provided by platform, which also *
* <li>Implementors can write to the database. Transaction is provided by platform, which also
* takes care of lifecycle of the transaction
* <li>Implementors must ensure that if they generate follow up events, these are applied to the
* database while this method is called
Expand All @@ -63,7 +62,23 @@ public interface RecordProcessor {
ProcessingResult process(TypedRecord record, ProcessingResultBuilder processingResultBuilder);

/**
* Called by platform when a processing error occurred
* Called by platform when a processing error occurred.
*
* <p><em>Contract</em>
*
* <ul>
* <li>Record will be a command
* <li>Will be called if an uncaught exception is thrown in {@link #process(TypedRecord,
* ProcessingResultBuilder)}, or when an uncaught exception is thrown by the
* ProcessingStateMachine while committing the transaction
* <li>Implementors can write to the database. Transaction is provided by platform, which also
* takes care of lifecycle of the transaction
* <li>Implementors must ensure that if they generate follow up events, these are applied to the
* database while this method is called
* <li>Implementors can produce follow up commands, events and rejections, client responses and
* on commit tasks via {@code processingResultBuilder}
* <li>Implementors are responsible for error logging when needed
* </ul>
*
* @return the result of the processing; must be generated via {@code ProcessingResultBuilder
* processingResultBuilder }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableProcess;
import io.camunda.zeebe.engine.processing.deployment.model.element.ExecutableSequenceFlow;
import io.camunda.zeebe.engine.processing.streamprocessor.CommandProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor.ProcessingError;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.processing.variable.VariableBehavior;
import io.camunda.zeebe.engine.state.KeyGenerator;
Expand All @@ -39,7 +41,6 @@
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferUtil;
import io.camunda.zeebe.util.exception.UncheckedExecutionException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -81,6 +82,7 @@ public final class CreateProcessInstanceProcessor

private final KeyGenerator keyGenerator;
private final TypedCommandWriter commandWriter;
private final TypedRejectionWriter rejectionWriter;
private final StateWriter stateWriter;
private final ProcessEngineMetrics metrics;

Expand All @@ -96,6 +98,7 @@ public CreateProcessInstanceProcessor(
this.catchEventBehavior = catchEventBehavior;
this.keyGenerator = keyGenerator;
commandWriter = writers.command();
rejectionWriter = writers.rejection();
stateWriter = writers.state();
this.metrics = metrics;
}
Expand All @@ -118,6 +121,18 @@ public boolean onCommand(
return true;
}

@Override
public ProcessingError tryHandleError(
final TypedRecord<ProcessInstanceCreationRecord> typedCommand, final Throwable error) {
if (error instanceof EventSubscriptionException exception) {
// This exception is only thrown for ProcessInstanceCreationRecord with start instructions
rejectionWriter.appendRejection(
typedCommand, RejectionType.INVALID_ARGUMENT, exception.getMessage());
return ProcessingError.EXPECTED_ERROR;
}
return ProcessingError.UNEXPECTED_ERROR;
}

private void createProcessInstance(
final CommandControl<ProcessInstanceCreationRecord> controller,
final ProcessInstanceCreationRecord record,
Expand Down Expand Up @@ -484,8 +499,7 @@ private void createEventSubscriptions(
.formatted(
BufferUtil.bufferAsString(element.getId()),
subscribedOrFailure.getLeft().getMessage());
// todo(#9644): reject command using logical transaction instead of exception throwing
throw new UncheckedExecutionException(message);
throw new EventSubscriptionException(message);
}
}
}
Expand All @@ -509,4 +523,16 @@ private ProcessInstanceRecord createProcessInstanceRecord(
record Rejection(RejectionType type, String reason) {}

record ElementIdAndType(String elementId, BpmnElementType elementType) {}

/**
* Exception that can be thrown during processing of the create process instance command, in case
* the engine could not subscribe to an event. This exception can be handled by {@link
* #tryHandleError(TypedRecord, Throwable)}.
*/
private static class EventSubscriptionException extends RuntimeException {

EventSubscriptionException(final String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor.ProcessingError;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
Expand Down Expand Up @@ -42,6 +43,17 @@ default void afterAccept(
final Intent intent,
final T value) {}

/**
* Try to handle an error that occurred during processing.
*
* @param command The command that was being processed when the error occurred
* @param error The error that occurred, and the processor should attempt to handle
* @return The type of the processing error. Default: {@link ProcessingError#UNEXPECTED_ERROR}.
*/
default ProcessingError tryHandleError(final TypedRecord<T> command, final Throwable error) {
return ProcessingError.UNEXPECTED_ERROR;
}

interface CommandControl<T> {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public void processRecord(
}
}

@Override
public ProcessingError tryHandleError(final TypedRecord<T> command, final Throwable error) {
return wrappedProcessor.tryHandleError(command, error);
}

@Override
public long accept(final Intent newState, final T updatedValue) {
if (entityKey < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,20 @@ default void processRecord(
final TypedRecord<T> record, final Consumer<SideEffectProducer> sideEffect) {
processRecord(record);
}

/**
* Try to handle an error that occurred during processing.
*
* @param command The command that was being processed when the error occurred
* @param error The error that occurred, and the processor should attempt to handle
* @return The type of the processing error. Default: {@link ProcessingError#UNEXPECTED_ERROR}.
*/
default ProcessingError tryHandleError(final TypedRecord<T> command, final Throwable error) {
return ProcessingError.UNEXPECTED_ERROR;
}

enum ProcessingError {
EXPECTED_ERROR,
UNEXPECTED_ERROR
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ private void processCommand(final LoggedEvent command) {
} catch (final UnrecoverableException unrecoverableException) {
throw unrecoverableException;
} catch (final Exception e) {
LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT, command, metadata, e);
onError(e, this::writeRecords);
}
}
Expand Down Expand Up @@ -326,6 +325,8 @@ private void errorHandlingInTransaction(final Throwable processingException) thr
final long position = typedCommand.getPosition();
final ProcessingResultBuilder processingResultBuilder =
new DirectProcessingResultBuilder(context, position);
// todo(#10047): replace this reset method by using Buffered Writers
processingResultBuilder.reset();

logStreamWriter.configureSourceContext(position);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@
package io.camunda.zeebe.engine.processing.processinstance;

import static io.camunda.zeebe.protocol.record.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;

Expand Down Expand Up @@ -182,6 +188,21 @@ public void shouldRejectCommandIfUnableToSubscribeToEvents() {
.onlyCommandRejections()
.getFirst())
.hasIntent(ProcessInstanceCreationIntent.CREATE)
.hasRejectionType(RejectionType.PROCESSING_ERROR);
.hasRejectionType(RejectionType.INVALID_ARGUMENT)
.hasRejectionReason(
"expected to subscribe to catch event(s) of 'subprocess' but failed to evaluate "
+ "expression 'unknown_var': no variable found for name 'unknown_var'");

Assertions.assertThat(
RecordingExporter.records()
.limit(
r ->
r.getRecordType() == RecordType.COMMAND_REJECTION
&& r.getIntent() == ProcessInstanceCreationIntent.CREATE))
.extracting(Record::getValueType, Record::getIntent)
.describedAs("Expect that no process instance is activated")
.doesNotContain(
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATING),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATED));
}
}

0 comments on commit 80dc320

Please sign in to comment.