Skip to content

Commit

Permalink
merge: #9779
Browse files Browse the repository at this point in the history
9779: 9725 rename and improve interfaces r=pihme a=pihme

## Description

- Renames interfaces to make it clearer that there could be multiple `RecordProcessor` implementations
- Define interfaces for `ProcessingResultBuilder` and `ProcessingResult` 

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #9725



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Jul 13, 2022
2 parents 8b5c14a + 50b6189 commit b005273
Show file tree
Hide file tree
Showing 15 changed files with 202 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
*/
package io.camunda.zeebe.broker.system.partitions;

import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;

@FunctionalInterface
public interface TypedRecordProcessorsFactory {

TypedRecordProcessors createTypedStreamProcessor(RecordProcessorContext recordProcessorContext);
TypedRecordProcessors createTypedStreamProcessor(
TypedRecordProcessorContext typedRecordProcessorContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.api.Engine;
import io.camunda.zeebe.engine.api.EngineContext;
import io.camunda.zeebe.engine.api.ErrorHandlingContext;
import io.camunda.zeebe.engine.api.ProcessingContext;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.function.Function;

public class EngineImpl implements Engine {
public class Engine implements RecordProcessor {

private final EventApplier eventApplier;

public EngineImpl(
public Engine(
final int partitionId,
final ZeebeDb zeebeDb,
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {
Expand All @@ -35,7 +35,7 @@ public EngineImpl(
}

@Override
public void init(final EngineContext engineContext) {
public void init(final RecordProcessorContext recordProcessorContext) {
throw new IllegalStateException("Not yet implemented");
}

Expand All @@ -54,7 +54,6 @@ public ProcessingResult process(
public ProcessingResult onProcessingError(
final Throwable processingException,
final TypedRecord record,
final long position,
final ErrorHandlingContext errorHandlingContext) {
throw new IllegalStateException("Not yet implemented");
}
Expand Down
23 changes: 0 additions & 23 deletions engine/src/main/java/io/camunda/zeebe/engine/api/Engine.java

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,17 @@
*/
package io.camunda.zeebe.engine.api;

public interface ProcessingResult {}
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;

/**
* Here the interface is just a suggestion. Can be whatever PDT teams thinks is best to work with
*/
public interface ProcessingResult {

long writeRecordsToStream(LogStreamRecordWriter logStreamRecordWriter);

boolean writeResponse(CommandResponseWriter commandResponseWriter);

void executePostCommitTasks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,57 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;

/** Builder to compose the processing result */
public interface ProcessingResultBuilder {

/**
* Appends a record to the result
*
* @return returns itself for method chaining
*/
ProcessingResultBuilder appendRecord(
final long key,
final RecordType type,
final Intent intent,
final RejectionType rejectionType,
final String rejectionReason,
final RecordValue value);

/**
* Sets the response for the result; will be overwritten if called more than once
*
* @return returns itself for method chaining
*/
ProcessingResultBuilder withResponse(
final long eventKey,
final Intent eventState,
final UnpackedObject eventValue,
final ValueType valueType,
final long requestId,
final int requestStreamId);

/**
* Appends a task to be executed after a successful commit ProcessingResultBuilder (replacement
* for side effects)
*
* @return returns itself for method chaining
*/
ProcessingResultBuilder appendPostCommitTask(Runnable r);

/**
* Resets the processing result build to its initial states (removes all follow-up records, the
* response and post-commit tasks.
*
* @return returns itself for method chaining
*/
ProcessingResultBuilder reset();

ProcessingResult build();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

/**
* Interface for record processors. A record processor is responsible for handling a single record.
* (The class {@code StreamProcessor} in turn is responsible for handling a stream of records.
*/
public interface RecordProcessor {

/**
* Called by platform to initialize the processor
*
* @param recordProcessorContext context object to initialize the processor
*/
void init(RecordProcessorContext recordProcessorContext);

/**
* Called by platform in order to replay a single record
*
* <p><em>Contract</em>
*
* <ul>
* <li>Record will be an event
* <li>Will be called before processing is called
* <li>Implementors can write to the database. Transaction is provided by platform, which also
* takes care of lifecycle of the transaction
* <li>Implementors must not write to the log stream
* <li>Implementors must not schedule post commit tasks
* </ul>
*
* @param record the record to replay
*/
void replay(TypedRecord record);

/**
* Called by platform to process a single record
*
* <p><em>Contract</em> * *
*
* <ul>
* *
* <li>Record will be a command
* <li>Will be called after replay is called
* <li>Implementors can write to the database. Transaction is provided by platform, which also *
* takes care of lifecycle of the transaction
* <li>Implementors must ensure that if they generate follow up events, these are applied to the
* database while this method is called
* <li>Implementors can produce follow up commands and events, client responses and on commit
* tasks via {@code * processingContext.getProcessingResultBuilder(). ... .build()}
* </ul>
*
* @param record
* @param processingContext
* @return the result of the processing; must be generated via {@code
* processingContext.getProcessingResultBuilder().build()}
*/
ProcessingResult process(TypedRecord record, ProcessingContext processingContext);

/**
* Called by platform when a processing error occurred
*
* @param processingException the exception that was thrown
* @param record the record for which the exception was thrown
* @param errorHandlingContext tbd
* @return the result of the processing; must be generated via {@code *
* processingContext.getProcessingResultBuilder().build()}
*/
ProcessingResult onProcessingError(
Throwable processingException, TypedRecord record, ErrorHandlingContext errorHandlingContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,4 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.LastProcessedPositionState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;

public interface RecordProcessorContext {

int getPartitionId();

ProcessingScheduleService getScheduleService();

MutableZeebeState getZeebeState();

Writers getWriters();

LastProcessedPositionState getLastProcessedPositionState();

RecordProcessorContext listener(StreamProcessorListener streamProcessorListener);
}
public interface RecordProcessorContext {}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import io.camunda.zeebe.el.ExpressionLanguageFactory;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.metrics.JobMetrics;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnEventPublicationBehavior;
import io.camunda.zeebe.engine.processing.common.CatchEventBehavior;
Expand All @@ -28,6 +27,7 @@
import io.camunda.zeebe.engine.processing.message.MessageEventProcessors;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker;
Expand All @@ -46,17 +46,17 @@
public final class EngineProcessors {

public static TypedRecordProcessors createEngineProcessors(
final RecordProcessorContext recordProcessorContext,
final TypedRecordProcessorContext typedRecordProcessorContext,
final int partitionsCount,
final SubscriptionCommandSender subscriptionCommandSender,
final DeploymentDistributor deploymentDistributor,
final DeploymentResponder deploymentResponder,
final Consumer<String> onJobsAvailableCallback,
final FeatureFlags featureFlags) {

final var scheduleService = recordProcessorContext.getScheduleService();
final MutableZeebeState zeebeState = recordProcessorContext.getZeebeState();
final var writers = recordProcessorContext.getWriters();
final var scheduleService = typedRecordProcessorContext.getScheduleService();
final MutableZeebeState zeebeState = typedRecordProcessorContext.getZeebeState();
final var writers = typedRecordProcessorContext.getWriters();
final TypedRecordProcessors typedRecordProcessors =
TypedRecordProcessors.processors(zeebeState.getKeyGenerator(), writers);

Expand All @@ -65,7 +65,7 @@ public static TypedRecordProcessors createEngineProcessors(

typedRecordProcessors.withListener(zeebeState);

final int partitionId = recordProcessorContext.getPartitionId();
final int partitionId = typedRecordProcessorContext.getPartitionId();

final var variablesState = zeebeState.getVariableState();
final var expressionProcessor =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.LastProcessedPositionState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;

public interface TypedRecordProcessorContext {

int getPartitionId();

ProcessingScheduleService getScheduleService();

MutableZeebeState getZeebeState();

Writers getWriters();

LastProcessedPositionState getLastProcessedPositionState();

TypedRecordProcessorContext listener(StreamProcessorListener streamProcessorListener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,15 @@
*/
package io.camunda.zeebe.engine.processing.streamprocessor;

import io.camunda.zeebe.engine.api.RecordProcessorContext;

@FunctionalInterface
public interface TypedRecordProcessorFactory {

/**
* Creates typed record processors with the given context.
*
* @param recordProcessorContext the processing context which contains value information to create
* record processors
* @param typedRecordProcessorContext the processing context which contains value information to
* create record processors
* @return the created typed record processors
*/
TypedRecordProcessors createProcessors(RecordProcessorContext recordProcessorContext);
TypedRecordProcessors createProcessors(TypedRecordProcessorContext typedRecordProcessorContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDbTransaction;
import io.camunda.zeebe.engine.api.Engine;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.metrics.ReplayMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.EventFilter;
Expand Down Expand Up @@ -92,10 +92,10 @@ public final class ReplayStateMachine implements LogRecordAwaiter {
private State currentState = State.AWAIT_RECORD;
private final BooleanSupplier shouldPause;
private final ReplayMetrics replayMetrics;
private final Engine engine;
private final RecordProcessor engine;

public ReplayStateMachine(
final Engine engine,
final RecordProcessor engine,
final StreamProcessorContext context,
final BooleanSupplier shouldReplayNext) {
this.engine = engine;
Expand Down

0 comments on commit b005273

Please sign in to comment.