Skip to content

Commit

Permalink
merge: #9846
Browse files Browse the repository at this point in the history
9846: Shape legacy code into new interfaces part 3 (code cleanup) r=npepinpe a=pihme

## Description

Some code cleanup before the next steps. 

## Related issues

related to #9725



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Jul 21, 2022
2 parents bbbb0d3 + 103a7d6 commit 358dc73
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 197 deletions.
43 changes: 17 additions & 26 deletions engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@
package io.camunda.zeebe.engine;

import io.camunda.zeebe.engine.api.EmptyProcessingResult;
import io.camunda.zeebe.engine.api.ErrorHandlingContext;
import io.camunda.zeebe.engine.api.ProcessingContext;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProcessorMap;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
Expand All @@ -31,7 +29,7 @@
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRelated;
import org.slf4j.Logger;

public class Engine implements RecordProcessor {
public class Engine implements RecordProcessor<EngineContext> {

private static final Logger LOG = Loggers.PROCESSOR_LOGGER;
private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT =
Expand All @@ -51,35 +49,32 @@ public class Engine implements RecordProcessor {
public Engine() {}

@Override
public void init(final RecordProcessorContext recordProcessorContext) {
streamWriter = recordProcessorContext.getStreamWriterProxy();
responseWriter = recordProcessorContext.getTypedResponseWriter();
public void init(final EngineContext engineContext) {
streamWriter = engineContext.getStreamWriterProxy();
responseWriter = engineContext.getTypedResponseWriter();

final var typedProcessorContext =
new TypedRecordProcessorContextImpl(
recordProcessorContext.getPartitionId(),
recordProcessorContext.getScheduleService(),
recordProcessorContext.getZeebeDb(),
recordProcessorContext.getTransactionContext(),
engineContext.getPartitionId(),
engineContext.getScheduleService(),
engineContext.getZeebeDb(),
engineContext.getTransactionContext(),
streamWriter,
recordProcessorContext.getEventApplierFactory(),
engineContext.getEventApplierFactory(),
responseWriter);

final TypedRecordProcessors typedRecordProcessors =
recordProcessorContext
.getTypedRecordProcessorFactory()
.createProcessors(typedProcessorContext);
engineContext.getTypedRecordProcessorFactory().createProcessors(typedProcessorContext);

recordProcessorContext.setStreamProcessorListener(
typedProcessorContext.getStreamProcessorListener());
engineContext.setStreamProcessorListener(typedProcessorContext.getStreamProcessorListener());

recordProcessorContext.setLifecycleListeners(typedRecordProcessors.getLifecycleListeners());
engineContext.setLifecycleListeners(typedRecordProcessors.getLifecycleListeners());
recordProcessorMap = typedRecordProcessors.getRecordProcessorMap();

writers = typedProcessorContext.getWriters();
recordProcessorContext.setWriters(writers);
engineContext.setWriters(writers);
zeebeState = typedProcessorContext.getZeebeState();
eventApplier = recordProcessorContext.getEventApplierFactory().apply(zeebeState);
eventApplier = engineContext.getEventApplierFactory().apply(zeebeState);
}

@Override
Expand All @@ -89,7 +84,7 @@ public void replay(final TypedRecord event) {

@Override
public ProcessingResult process(
final TypedRecord record, final ProcessingContext processingContext) {
final TypedRecord record, final ProcessingResultBuilder processingResultBuilder) {
TypedRecordProcessor<?> currentProcessor = null;

final var typedCommand = (TypedRecord<?>) record;
Expand All @@ -107,8 +102,6 @@ public ProcessingResult process(
return EmptyProcessingResult.INSTANCE;
}

final var processingResultBuilder = processingContext.getProcessingResultBuilder();

final boolean isNotOnBlacklist = !zeebeState.getBlackListState().isOnBlacklist(typedCommand);
if (isNotOnBlacklist) {
final long position = typedCommand.getPosition();
Expand All @@ -130,14 +123,12 @@ public ProcessingResult process(
public ProcessingResult onProcessingError(
final Throwable processingException,
final TypedRecord record,
final ErrorHandlingContext errorHandlingContext) {
final ProcessingResultBuilder processingResultBuilder) {

final String errorMessage =
String.format(PROCESSING_ERROR_MESSAGE, record, processingException.getMessage());
LOG.error(errorMessage, processingException);

final var processingResultBuilder = errorHandlingContext.getProcessingResultBuilder();

writers.rejection().appendRejection(record, RejectionType.PROCESSING_ERROR, errorMessage);
writers
.response()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor;
package io.camunda.zeebe.engine;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
Expand All @@ -23,7 +22,7 @@
import java.util.List;
import java.util.function.Function;

public final class RecordProcessorContextImpl implements RecordProcessorContext {
public final class EngineContext {

private final int partitionId;
private final ProcessingScheduleService scheduleService;
Expand All @@ -37,7 +36,7 @@ public final class RecordProcessorContextImpl implements RecordProcessorContext
private StreamProcessorListener streamProcessorListener;
private Writers writers;

public RecordProcessorContextImpl(
public EngineContext(
final int partitionId,
final ProcessingScheduleService scheduleService,
final ZeebeDb zeebeDb,
Expand All @@ -56,52 +55,42 @@ public RecordProcessorContextImpl(
this.typedRecordProcessorFactory = typedRecordProcessorFactory;
}

@Override
public int getPartitionId() {
return partitionId;
}

@Override
public ProcessingScheduleService getScheduleService() {
return scheduleService;
}

@Override
public ZeebeDb getZeebeDb() {
return zeebeDb;
}

@Override
public TransactionContext getTransactionContext() {
return transactionContext;
}

@Override
public TypedStreamWriter getStreamWriterProxy() {
return streamWriter;
}

@Override
public TypedResponseWriter getTypedResponseWriter() {
return responseWriter;
}

@Override
public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

@Override
public TypedRecordProcessorFactory getTypedRecordProcessorFactory() {
return typedRecordProcessorFactory;
}

@Deprecated // will likely be moved to engine
public List<StreamProcessorLifecycleAware> getLifecycleListeners() {
return lifecycleListeners;
}

@Override
public void setLifecycleListeners(final List<StreamProcessorLifecycleAware> lifecycleListeners) {
this.lifecycleListeners = lifecycleListeners;
}
Expand All @@ -110,7 +99,6 @@ public StreamProcessorListener getStreamProcessorListener() {
return streamProcessorListener;
}

@Override
public void setStreamProcessorListener(final StreamProcessorListener streamProcessorListener) {
this.streamProcessorListener = streamProcessorListener;
}
Expand All @@ -119,7 +107,6 @@ public Writers getWriters() {
return writers;
}

@Override
public void setWriters(final Writers writers) {
this.writers = writers;
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
* Interface for record processors. A record processor is responsible for handling a single record.
* (The class {@code StreamProcessor} in turn is responsible for handling a stream of records.
*/
public interface RecordProcessor {
public interface RecordProcessor<CONTEXT> {

/**
* Called by platform to initialize the processor
*
* @param recordProcessorContext context object to initialize the processor
*/
void init(RecordProcessorContext recordProcessorContext);
void init(CONTEXT recordProcessorContext);

/**
* Called by platform in order to replay a single record
Expand Down Expand Up @@ -52,25 +52,24 @@ public interface RecordProcessor {
* <li>Implementors must ensure that if they generate follow up events, these are applied to the
* database while this method is called
* <li>Implementors can produce follow up commands and events, client responses and on commit
* tasks via {@code * processingContext.getProcessingResultBuilder(). ... .build()}
* tasks via {@code processingResultBuilder}
* <li>Implementors can indicate that the record should be skipped by returning {@code
* EmptyProcessingResult.INSTANCE}
* </ul>
*
* @param record
* @param processingContext
* @return the result of the processing; must be generated via {@code
* processingContext.getProcessingResultBuilder().build()}
* processingResultBuilder.build()}
*/
ProcessingResult process(TypedRecord record, ProcessingContext processingContext);
ProcessingResult process(TypedRecord record, ProcessingResultBuilder processingResultBuilder);

/**
* Called by platform when a processing error occurred
*
* @param processingException the exception that was thrown
* @param record the record for which the exception was thrown
* @param errorHandlingContext tbd
* @return the result of the processing; must be generated via {@code *
* processingContext.getProcessingResultBuilder().build()}
* @return the result of the processing; must be generated via {@code ProcessingResultBuilder
* processingResultBuilder }
*/
ProcessingResult onProcessingError(
Throwable processingException, TypedRecord record, ErrorHandlingContext errorHandlingContext);
Throwable processingException,
TypedRecord record,
ProcessingResultBuilder processingResultBuilder);
}

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit 358dc73

Please sign in to comment.