Skip to content

Commit

Permalink
merge: #9985
Browse files Browse the repository at this point in the history
9985: Create EventAppliers in engine r=Zelldon a=Zelldon

## Description

The StreamProcessor nor the builder doesn't need the knowledge about the EventAppliers. Currently there is no need to set this from outside.

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->




Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 4, 2022
2 parents 00fb0db + 73850d2 commit eef5db2
Show file tree
Hide file tree
Showing 7 changed files with 2 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.atomix.raft.RaftServer.Role;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
Expand Down Expand Up @@ -127,7 +126,6 @@ private static StreamProcessor createStreamProcessor(
.logStream(context.getLogStream())
.actorSchedulingService(context.getActorSchedulingService())
.zeebeDb(context.getZeebeDb())
.eventApplierFactory(EventAppliers::new)
.nodeId(context.getNodeId())
.commandResponseWriter(context.getCommandResponseWriter())
.listener(processedCommand -> context.getOnProcessedListener().accept(processedCommand))
Expand Down
3 changes: 2 additions & 1 deletion engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.state.processing.DbBlackListState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void init(final EngineContext engineContext) {
engineContext.getPartitionId(),
engineContext.getZeebeDb(),
engineContext.getTransactionContext());
eventApplier = engineContext.getEventApplierFactory().apply(zeebeState);
eventApplier = new EventAppliers(zeebeState);

writers = new Writers(resultBuilderMutex, eventApplier);

Expand Down
10 changes: 0 additions & 10 deletions engine/src/main/java/io/camunda/zeebe/engine/EngineContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;

public final class EngineContext {

Expand All @@ -29,7 +26,6 @@ public final class EngineContext {
private final TransactionContext transactionContext;
private final LegacyTypedStreamWriter streamWriter;
private final LegacyTypedResponseWriter responseWriter;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final TypedRecordProcessorFactory typedRecordProcessorFactory;
private List<StreamProcessorLifecycleAware> lifecycleListeners = Collections.EMPTY_LIST;
private StreamProcessorListener streamProcessorListener;
Expand All @@ -41,15 +37,13 @@ public EngineContext(
final TransactionContext transactionContext,
final LegacyTypedStreamWriter streamWriter,
final LegacyTypedResponseWriter responseWriter,
final Function<MutableZeebeState, EventApplier> eventApplierFactory,
final TypedRecordProcessorFactory typedRecordProcessorFactory) {
this.partitionId = partitionId;
this.scheduleService = scheduleService;
this.zeebeDb = zeebeDb;
this.transactionContext = transactionContext;
this.streamWriter = streamWriter;
this.responseWriter = responseWriter;
this.eventApplierFactory = eventApplierFactory;
this.typedRecordProcessorFactory = typedRecordProcessorFactory;
}

Expand Down Expand Up @@ -77,10 +71,6 @@ public LegacyTypedResponseWriter getTypedResponseWriter() {
return responseWriter;
}

public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

public TypedRecordProcessorFactory getTypedRecordProcessorFactory() {
return typedRecordProcessorFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
Expand Down Expand Up @@ -89,7 +87,6 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private final ActorSchedulingService actorSchedulingService;
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final List<StreamProcessorLifecycleAware> lifecycleAwareListeners;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final Set<FailureListener> failureListeners = new HashSet<>();
private final StreamProcessorMetrics metrics;

Expand Down Expand Up @@ -126,8 +123,6 @@ protected StreamProcessor(final StreamProcessorBuilder processorBuilder) {
typedStreamWriterFactory = processorBuilder.getTypedStreamWriterFactory();
zeebeDb = processorBuilder.getZeebeDb();

eventApplierFactory = processorBuilder.getEventApplierFactory();

streamProcessorContext =
processorBuilder
.getProcessingContext()
Expand Down Expand Up @@ -335,7 +330,6 @@ private void initEngine() {
streamProcessorContext.getTransactionContext(),
streamProcessorContext.getLogStreamWriter(),
streamProcessorContext.getTypedResponseWriter(),
eventApplierFactory,
typedRecordProcessorFactory);
engine.init(engineContext);

Expand Down Expand Up @@ -379,7 +373,6 @@ private long recoverFromSnapshot() {
private void recoverZeebeDbState(final TransactionContext transactionContext) {
final ZeebeDbState zeebeState = new ZeebeDbState(partitionId, zeebeDb, transactionContext);
streamProcessorContext.zeebeState(zeebeState);
streamProcessorContext.eventApplier(eventApplierFactory.apply(zeebeState));
}

private void onRecovered(final LastProcessingPositions lastProcessingPositions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriterImpl;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
Expand All @@ -31,7 +29,6 @@ public final class StreamProcessorBuilder {
private TypedRecordProcessorFactory typedRecordProcessorFactory;
private ActorSchedulingService actorSchedulingService;
private ZeebeDb zeebeDb;
private Function<MutableZeebeState, EventApplier> eventApplierFactory;
private int nodeId;
private Function<LogStreamBatchWriter, LegacyTypedStreamWriter> typedStreamWriterFactory =
LegacyTypedStreamWriterImpl::new;
Expand Down Expand Up @@ -78,12 +75,6 @@ public StreamProcessorBuilder zeebeDb(final ZeebeDb zeebeDb) {
return this;
}

public StreamProcessorBuilder eventApplierFactory(
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {
this.eventApplierFactory = eventApplierFactory;
return this;
}

public StreamProcessorBuilder streamProcessorMode(final StreamProcessorMode streamProcessorMode) {
streamProcessorContext.processorMode(streamProcessorMode);
return this;
Expand Down Expand Up @@ -113,10 +104,6 @@ public int getNodeId() {
return nodeId;
}

public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

public StreamProcessor build() {
validate();

Expand All @@ -128,7 +115,6 @@ private void validate() {
Objects.requireNonNull(actorSchedulingService, "No task scheduler provided.");
Objects.requireNonNull(streamProcessorContext.getLogStream(), "No log stream provided.");
Objects.requireNonNull(zeebeDb, "No database provided.");
Objects.requireNonNull(eventApplierFactory, "No factory for the event supplier provided.");
}

public Function<LogStreamBatchWriter, LegacyTypedStreamWriter> getTypedStreamWriterFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriterImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.NoopLegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
Expand Down Expand Up @@ -45,7 +44,6 @@ public final class StreamProcessorContext implements ReadonlyStreamProcessorCont
private RecordValues recordValues;
private ZeebeDbState zeebeState;
private TransactionContext transactionContext;
private EventApplier eventApplier;

private BooleanSupplier abortCondition;
private StreamProcessorListener streamProcessorListener = NOOP_LISTENER;
Expand Down Expand Up @@ -150,11 +148,6 @@ public LegacyTypedResponseWriterImpl getTypedResponseWriter() {
return typedResponseWriter;
}

public StreamProcessorContext eventApplier(final EventApplier eventApplier) {
this.eventApplier = eventApplier;
return this;
}

public StreamProcessorContext processorMode(final StreamProcessorMode streamProcessorMode) {
this.streamProcessorMode = streamProcessorMode;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) {
.commandResponseWriter(mockCommandResponseWriter)
.listener(mockStreamProcessorListener)
.streamProcessorFactory(wrappedFactory)
.eventApplierFactory(eventApplierFactory)
.streamProcessorMode(streamProcessorMode);

if (streamWriterFactory != null) {
Expand Down

0 comments on commit eef5db2

Please sign in to comment.