Skip to content

Commit

Permalink
refactor: create EventApplier in engine
Browse files Browse the repository at this point in the history
The StreamProcessor nor the builder doesn't need the knowledge about the EventAppliers. Currently there is no need to set this from outside.
  • Loading branch information
Zelldon committed Aug 8, 2022
1 parent e090e11 commit 79bf2bc
Show file tree
Hide file tree
Showing 9 changed files with 5 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void setup() {

private RecordProcessorContextImpl createContext(
final ProcessingScheduleService executor, final ZeebeDb zeebeDb) {
return new RecordProcessorContextImpl(1, executor, zeebeDb, zeebeDb.createContext(), null);
return new RecordProcessorContextImpl(1, executor, zeebeDb, zeebeDb.createContext());
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
Expand Down Expand Up @@ -129,7 +128,6 @@ private static StreamProcessor createStreamProcessor(
.actorSchedulingService(context.getActorSchedulingService())
.zeebeDb(context.getZeebeDb())
.recordProcessor(new Engine(context.getTypedRecordProcessorFactory()))
.eventApplierFactory(EventAppliers::new)
.nodeId(context.getNodeId())
.commandResponseWriter(context.getCommandResponseWriter())
.listener(processedCommand -> context.getOnProcessedListener().accept(processedCommand))
Expand Down
3 changes: 2 additions & 1 deletion engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.state.processing.DbBlackListState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void init(final RecordProcessorContext recordProcessorContext) {
recordProcessorContext.getPartitionId(),
recordProcessorContext.getZeebeDb(),
recordProcessorContext.getTransactionContext());
eventApplier = recordProcessorContext.getEventApplierFactory().apply(zeebeState);
eventApplier = new EventAppliers(zeebeState);

writers = new Writers(resultBuilderMutex, eventApplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.List;
import java.util.function.Function;

public interface RecordProcessorContext {

Expand All @@ -25,8 +22,6 @@ public interface RecordProcessorContext {

TransactionContext getTransactionContext();

Function<MutableZeebeState, EventApplier> getEventApplierFactory();

List<StreamProcessorLifecycleAware> getLifecycleListeners();

StreamProcessorListener getStreamProcessorListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,27 @@
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public final class RecordProcessorContextImpl implements RecordProcessorContext {

private final int partitionId;
private final ProcessingScheduleService scheduleService;
private final ZeebeDb zeebeDb;
private final TransactionContext transactionContext;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final List<StreamProcessorLifecycleAware> lifecycleListeners = new ArrayList<>();
private StreamProcessorListener streamProcessorListener;

public RecordProcessorContextImpl(
final int partitionId,
final ProcessingScheduleService scheduleService,
final ZeebeDb zeebeDb,
final TransactionContext transactionContext,
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {
final TransactionContext transactionContext) {
this.partitionId = partitionId;
this.scheduleService = scheduleService;
this.zeebeDb = zeebeDb;
this.transactionContext = transactionContext;
this.eventApplierFactory = eventApplierFactory;
}

@Override
Expand All @@ -62,11 +56,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}

@Override
public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

@Override
public List<StreamProcessorLifecycleAware> getLifecycleListeners() {
return lifecycleListeners;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
Expand Down Expand Up @@ -87,7 +85,6 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private final ActorSchedulingService actorSchedulingService;
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final List<StreamProcessorLifecycleAware> lifecycleAwareListeners;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final Set<FailureListener> failureListeners = new HashSet<>();
private final StreamProcessorMetrics metrics;

Expand Down Expand Up @@ -121,8 +118,6 @@ protected StreamProcessor(final StreamProcessorBuilder processorBuilder) {
typedStreamWriterFactory = processorBuilder.getTypedStreamWriterFactory();
zeebeDb = processorBuilder.getZeebeDb();

eventApplierFactory = processorBuilder.getEventApplierFactory();

streamProcessorContext =
processorBuilder
.getProcessingContext()
Expand Down Expand Up @@ -322,8 +317,7 @@ private void initEngine() {
partitionId,
streamProcessorContext.getScheduleService(),
zeebeDb,
streamProcessorContext.getTransactionContext(),
eventApplierFactory);
streamProcessorContext.getTransactionContext());
recordProcessor.init(engineContext);

lifecycleAwareListeners.addAll(engineContext.getLifecycleListeners());
Expand Down Expand Up @@ -366,7 +360,6 @@ private long recoverFromSnapshot() {
private void recoverZeebeDbState(final TransactionContext transactionContext) {
final ZeebeDbState zeebeState = new ZeebeDbState(partitionId, zeebeDb, transactionContext);
streamProcessorContext.zeebeState(zeebeState);
streamProcessorContext.eventApplier(eventApplierFactory.apply(zeebeState));
}

private void onRecovered(final LastProcessingPositions lastProcessingPositions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriterImpl;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
Expand All @@ -30,7 +28,6 @@ public final class StreamProcessorBuilder {
private final List<StreamProcessorLifecycleAware> lifecycleListeners = new ArrayList<>();
private ActorSchedulingService actorSchedulingService;
private ZeebeDb zeebeDb;
private Function<MutableZeebeState, EventApplier> eventApplierFactory;
private int nodeId;
private Function<LogStreamBatchWriter, LegacyTypedStreamWriter> typedStreamWriterFactory =
LegacyTypedStreamWriterImpl::new;
Expand Down Expand Up @@ -78,12 +75,6 @@ public StreamProcessorBuilder zeebeDb(final ZeebeDb zeebeDb) {
return this;
}

public StreamProcessorBuilder eventApplierFactory(
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {
this.eventApplierFactory = eventApplierFactory;
return this;
}

public StreamProcessorBuilder streamProcessorMode(final StreamProcessorMode streamProcessorMode) {
streamProcessorContext.processorMode(streamProcessorMode);
return this;
Expand Down Expand Up @@ -113,10 +104,6 @@ public RecordProcessor getRecordProcessor() {
return recordProcessor;
}

public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

public StreamProcessor build() {
validate();

Expand All @@ -127,7 +114,6 @@ private void validate() {
Objects.requireNonNull(actorSchedulingService, "No task scheduler provided.");
Objects.requireNonNull(streamProcessorContext.getLogStream(), "No log stream provided.");
Objects.requireNonNull(zeebeDb, "No database provided.");
Objects.requireNonNull(eventApplierFactory, "No factory for the event supplier provided.");
}

public Function<LogStreamBatchWriter, LegacyTypedStreamWriter> getTypedStreamWriterFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriterImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
Expand All @@ -39,7 +38,6 @@ public final class StreamProcessorContext implements ReadonlyStreamProcessorCont
private RecordValues recordValues;
private ZeebeDbState zeebeState;
private TransactionContext transactionContext;
private EventApplier eventApplier;

private BooleanSupplier abortCondition;
private StreamProcessorListener streamProcessorListener = NOOP_LISTENER;
Expand Down Expand Up @@ -144,11 +142,6 @@ public LegacyTypedResponseWriterImpl getTypedResponseWriter() {
return typedResponseWriter;
}

public StreamProcessorContext eventApplier(final EventApplier eventApplier) {
this.eventApplier = eventApplier;
return this;
}

public StreamProcessorContext processorMode(final StreamProcessorMode streamProcessorMode) {
this.streamProcessorMode = streamProcessorMode;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) {
.commandResponseWriter(mockCommandResponseWriter)
.listener(mockStreamProcessorListener)
.recordProcessor(new Engine(wrappedFactory))
.eventApplierFactory(eventApplierFactory)
.streamProcessorMode(streamProcessorMode);

if (streamWriterFactory != null) {
Expand Down

0 comments on commit 79bf2bc

Please sign in to comment.