Skip to content

Commit

Permalink
merge: #9985
Browse files Browse the repository at this point in the history
9985: Create EventAppliers in engine r=Zelldon a=Zelldon

## Description

The StreamProcessor nor the builder doesn't need the knowledge about the EventAppliers. Currently there is no need to set this from outside.

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->




Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 8, 2022
2 parents e090e11 + 79bf2bc commit b344923
Show file tree
Hide file tree
Showing 9 changed files with 5 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void setup() {

private RecordProcessorContextImpl createContext(
final ProcessingScheduleService executor, final ZeebeDb zeebeDb) {
return new RecordProcessorContextImpl(1, executor, zeebeDb, zeebeDb.createContext(), null);
return new RecordProcessorContextImpl(1, executor, zeebeDb, zeebeDb.createContext());
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
Expand Down Expand Up @@ -129,7 +128,6 @@ private static StreamProcessor createStreamProcessor(
.actorSchedulingService(context.getActorSchedulingService())
.zeebeDb(context.getZeebeDb())
.recordProcessor(new Engine(context.getTypedRecordProcessorFactory()))
.eventApplierFactory(EventAppliers::new)
.nodeId(context.getNodeId())
.commandResponseWriter(context.getCommandResponseWriter())
.listener(processedCommand -> context.getOnProcessedListener().accept(processedCommand))
Expand Down
3 changes: 2 additions & 1 deletion engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.state.processing.DbBlackListState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void init(final RecordProcessorContext recordProcessorContext) {
recordProcessorContext.getPartitionId(),
recordProcessorContext.getZeebeDb(),
recordProcessorContext.getTransactionContext());
eventApplier = recordProcessorContext.getEventApplierFactory().apply(zeebeState);
eventApplier = new EventAppliers(zeebeState);

writers = new Writers(resultBuilderMutex, eventApplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.List;
import java.util.function.Function;

public interface RecordProcessorContext {

Expand All @@ -25,8 +22,6 @@ public interface RecordProcessorContext {

TransactionContext getTransactionContext();

Function<MutableZeebeState, EventApplier> getEventApplierFactory();

List<StreamProcessorLifecycleAware> getLifecycleListeners();

StreamProcessorListener getStreamProcessorListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,27 @@
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public final class RecordProcessorContextImpl implements RecordProcessorContext {

private final int partitionId;
private final ProcessingScheduleService scheduleService;
private final ZeebeDb zeebeDb;
private final TransactionContext transactionContext;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final List<StreamProcessorLifecycleAware> lifecycleListeners = new ArrayList<>();
private StreamProcessorListener streamProcessorListener;

public RecordProcessorContextImpl(
final int partitionId,
final ProcessingScheduleService scheduleService,
final ZeebeDb zeebeDb,
final TransactionContext transactionContext,
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {
final TransactionContext transactionContext) {
this.partitionId = partitionId;
this.scheduleService = scheduleService;
this.zeebeDb = zeebeDb;
this.transactionContext = transactionContext;
this.eventApplierFactory = eventApplierFactory;
}

@Override
Expand All @@ -62,11 +56,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}

@Override
public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

@Override
public List<StreamProcessorLifecycleAware> getLifecycleListeners() {
return lifecycleListeners;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
Expand Down Expand Up @@ -87,7 +85,6 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private final ActorSchedulingService actorSchedulingService;
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final List<StreamProcessorLifecycleAware> lifecycleAwareListeners;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final Set<FailureListener> failureListeners = new HashSet<>();
private final StreamProcessorMetrics metrics;

Expand Down Expand Up @@ -121,8 +118,6 @@ protected StreamProcessor(final StreamProcessorBuilder processorBuilder) {
typedStreamWriterFactory = processorBuilder.getTypedStreamWriterFactory();
zeebeDb = processorBuilder.getZeebeDb();

eventApplierFactory = processorBuilder.getEventApplierFactory();

streamProcessorContext =
processorBuilder
.getProcessingContext()
Expand Down Expand Up @@ -322,8 +317,7 @@ private void initEngine() {
partitionId,
streamProcessorContext.getScheduleService(),
zeebeDb,
streamProcessorContext.getTransactionContext(),
eventApplierFactory);
streamProcessorContext.getTransactionContext());
recordProcessor.init(engineContext);

lifecycleAwareListeners.addAll(engineContext.getLifecycleListeners());
Expand Down Expand Up @@ -366,7 +360,6 @@ private long recoverFromSnapshot() {
private void recoverZeebeDbState(final TransactionContext transactionContext) {
final ZeebeDbState zeebeState = new ZeebeDbState(partitionId, zeebeDb, transactionContext);
streamProcessorContext.zeebeState(zeebeState);
streamProcessorContext.eventApplier(eventApplierFactory.apply(zeebeState));
}

private void onRecovered(final LastProcessingPositions lastProcessingPositions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriterImpl;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
Expand All @@ -30,7 +28,6 @@ public final class StreamProcessorBuilder {
private final List<StreamProcessorLifecycleAware> lifecycleListeners = new ArrayList<>();
private ActorSchedulingService actorSchedulingService;
private ZeebeDb zeebeDb;
private Function<MutableZeebeState, EventApplier> eventApplierFactory;
private int nodeId;
private Function<LogStreamBatchWriter, LegacyTypedStreamWriter> typedStreamWriterFactory =
LegacyTypedStreamWriterImpl::new;
Expand Down Expand Up @@ -78,12 +75,6 @@ public StreamProcessorBuilder zeebeDb(final ZeebeDb zeebeDb) {
return this;
}

public StreamProcessorBuilder eventApplierFactory(
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {
this.eventApplierFactory = eventApplierFactory;
return this;
}

public StreamProcessorBuilder streamProcessorMode(final StreamProcessorMode streamProcessorMode) {
streamProcessorContext.processorMode(streamProcessorMode);
return this;
Expand Down Expand Up @@ -113,10 +104,6 @@ public RecordProcessor getRecordProcessor() {
return recordProcessor;
}

public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

public StreamProcessor build() {
validate();

Expand All @@ -127,7 +114,6 @@ private void validate() {
Objects.requireNonNull(actorSchedulingService, "No task scheduler provided.");
Objects.requireNonNull(streamProcessorContext.getLogStream(), "No log stream provided.");
Objects.requireNonNull(zeebeDb, "No database provided.");
Objects.requireNonNull(eventApplierFactory, "No factory for the event supplier provided.");
}

public Function<LogStreamBatchWriter, LegacyTypedStreamWriter> getTypedStreamWriterFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriterImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
Expand All @@ -39,7 +38,6 @@ public final class StreamProcessorContext implements ReadonlyStreamProcessorCont
private RecordValues recordValues;
private ZeebeDbState zeebeState;
private TransactionContext transactionContext;
private EventApplier eventApplier;

private BooleanSupplier abortCondition;
private StreamProcessorListener streamProcessorListener = NOOP_LISTENER;
Expand Down Expand Up @@ -144,11 +142,6 @@ public LegacyTypedResponseWriterImpl getTypedResponseWriter() {
return typedResponseWriter;
}

public StreamProcessorContext eventApplier(final EventApplier eventApplier) {
this.eventApplier = eventApplier;
return this;
}

public StreamProcessorContext processorMode(final StreamProcessorMode streamProcessorMode) {
this.streamProcessorMode = streamProcessorMode;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) {
.commandResponseWriter(mockCommandResponseWriter)
.listener(mockStreamProcessorListener)
.recordProcessor(new Engine(wrappedFactory))
.eventApplierFactory(eventApplierFactory)
.streamProcessorMode(streamProcessorMode);

if (streamWriterFactory != null) {
Expand Down

0 comments on commit b344923

Please sign in to comment.