Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create EventAppliers in engine #9985

Merged
merged 1 commit into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void setup() {

private RecordProcessorContextImpl createContext(
final ProcessingScheduleService executor, final ZeebeDb zeebeDb) {
return new RecordProcessorContextImpl(1, executor, zeebeDb, zeebeDb.createContext(), null);
return new RecordProcessorContextImpl(1, executor, zeebeDb, zeebeDb.createContext());
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
Expand Down Expand Up @@ -129,7 +128,6 @@ private static StreamProcessor createStreamProcessor(
.actorSchedulingService(context.getActorSchedulingService())
.zeebeDb(context.getZeebeDb())
.recordProcessor(new Engine(context.getTypedRecordProcessorFactory()))
.eventApplierFactory(EventAppliers::new)
.nodeId(context.getNodeId())
.commandResponseWriter(context.getCommandResponseWriter())
.listener(processedCommand -> context.getOnProcessedListener().accept(processedCommand))
Expand Down
3 changes: 2 additions & 1 deletion engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.state.processing.DbBlackListState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
Expand Down Expand Up @@ -63,7 +64,7 @@ public void init(final RecordProcessorContext recordProcessorContext) {
recordProcessorContext.getPartitionId(),
recordProcessorContext.getZeebeDb(),
recordProcessorContext.getTransactionContext());
eventApplier = recordProcessorContext.getEventApplierFactory().apply(zeebeState);
eventApplier = new EventAppliers(zeebeState);

writers = new Writers(resultBuilderMutex, eventApplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.List;
import java.util.function.Function;

public interface RecordProcessorContext {

Expand All @@ -25,8 +22,6 @@ public interface RecordProcessorContext {

TransactionContext getTransactionContext();

Function<MutableZeebeState, EventApplier> getEventApplierFactory();

List<StreamProcessorLifecycleAware> getLifecycleListeners();

StreamProcessorListener getStreamProcessorListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,33 +13,27 @@
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public final class RecordProcessorContextImpl implements RecordProcessorContext {

private final int partitionId;
private final ProcessingScheduleService scheduleService;
private final ZeebeDb zeebeDb;
private final TransactionContext transactionContext;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final List<StreamProcessorLifecycleAware> lifecycleListeners = new ArrayList<>();
private StreamProcessorListener streamProcessorListener;

public RecordProcessorContextImpl(
final int partitionId,
final ProcessingScheduleService scheduleService,
final ZeebeDb zeebeDb,
final TransactionContext transactionContext,
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {
final TransactionContext transactionContext) {
this.partitionId = partitionId;
this.scheduleService = scheduleService;
this.zeebeDb = zeebeDb;
this.transactionContext = transactionContext;
this.eventApplierFactory = eventApplierFactory;
}

@Override
Expand All @@ -62,11 +56,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}

@Override
public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

@Override
public List<StreamProcessorLifecycleAware> getLifecycleListeners() {
return lifecycleListeners;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
Expand Down Expand Up @@ -87,7 +85,6 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private final ActorSchedulingService actorSchedulingService;
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final List<StreamProcessorLifecycleAware> lifecycleAwareListeners;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final Set<FailureListener> failureListeners = new HashSet<>();
private final StreamProcessorMetrics metrics;

Expand Down Expand Up @@ -121,8 +118,6 @@ protected StreamProcessor(final StreamProcessorBuilder processorBuilder) {
typedStreamWriterFactory = processorBuilder.getTypedStreamWriterFactory();
zeebeDb = processorBuilder.getZeebeDb();

eventApplierFactory = processorBuilder.getEventApplierFactory();

streamProcessorContext =
processorBuilder
.getProcessingContext()
Expand Down Expand Up @@ -322,8 +317,7 @@ private void initEngine() {
partitionId,
streamProcessorContext.getScheduleService(),
zeebeDb,
streamProcessorContext.getTransactionContext(),
eventApplierFactory);
streamProcessorContext.getTransactionContext());
recordProcessor.init(engineContext);

lifecycleAwareListeners.addAll(engineContext.getLifecycleListeners());
Expand Down Expand Up @@ -366,7 +360,6 @@ private long recoverFromSnapshot() {
private void recoverZeebeDbState(final TransactionContext transactionContext) {
final ZeebeDbState zeebeState = new ZeebeDbState(partitionId, zeebeDb, transactionContext);
streamProcessorContext.zeebeState(zeebeState);
streamProcessorContext.eventApplier(eventApplierFactory.apply(zeebeState));
}

private void onRecovered(final LastProcessingPositions lastProcessingPositions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriterImpl;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
Expand All @@ -30,7 +28,6 @@ public final class StreamProcessorBuilder {
private final List<StreamProcessorLifecycleAware> lifecycleListeners = new ArrayList<>();
private ActorSchedulingService actorSchedulingService;
private ZeebeDb zeebeDb;
private Function<MutableZeebeState, EventApplier> eventApplierFactory;
private int nodeId;
private Function<LogStreamBatchWriter, LegacyTypedStreamWriter> typedStreamWriterFactory =
LegacyTypedStreamWriterImpl::new;
Expand Down Expand Up @@ -78,12 +75,6 @@ public StreamProcessorBuilder zeebeDb(final ZeebeDb zeebeDb) {
return this;
}

public StreamProcessorBuilder eventApplierFactory(
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {
this.eventApplierFactory = eventApplierFactory;
return this;
}

public StreamProcessorBuilder streamProcessorMode(final StreamProcessorMode streamProcessorMode) {
streamProcessorContext.processorMode(streamProcessorMode);
return this;
Expand Down Expand Up @@ -113,10 +104,6 @@ public RecordProcessor getRecordProcessor() {
return recordProcessor;
}

public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

public StreamProcessor build() {
validate();

Expand All @@ -127,7 +114,6 @@ private void validate() {
Objects.requireNonNull(actorSchedulingService, "No task scheduler provided.");
Objects.requireNonNull(streamProcessorContext.getLogStream(), "No log stream provided.");
Objects.requireNonNull(zeebeDb, "No database provided.");
Objects.requireNonNull(eventApplierFactory, "No factory for the event supplier provided.");
}

public Function<LogStreamBatchWriter, LegacyTypedStreamWriter> getTypedStreamWriterFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriterImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
Expand All @@ -39,7 +38,6 @@ public final class StreamProcessorContext implements ReadonlyStreamProcessorCont
private RecordValues recordValues;
private ZeebeDbState zeebeState;
private TransactionContext transactionContext;
private EventApplier eventApplier;

private BooleanSupplier abortCondition;
private StreamProcessorListener streamProcessorListener = NOOP_LISTENER;
Expand Down Expand Up @@ -144,11 +142,6 @@ public LegacyTypedResponseWriterImpl getTypedResponseWriter() {
return typedResponseWriter;
}

public StreamProcessorContext eventApplier(final EventApplier eventApplier) {
this.eventApplier = eventApplier;
return this;
}

public StreamProcessorContext processorMode(final StreamProcessorMode streamProcessorMode) {
this.streamProcessorMode = streamProcessorMode;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,6 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) {
.commandResponseWriter(mockCommandResponseWriter)
.listener(mockStreamProcessorListener)
.recordProcessor(new Engine(wrappedFactory))
.eventApplierFactory(eventApplierFactory)
.streamProcessorMode(streamProcessorMode);

if (streamWriterFactory != null) {
Expand Down