Skip to content

Commit

Permalink
Revert "Merge pull request #9985 from camunda/zell-clean-up-abstracti…
Browse files Browse the repository at this point in the history
…ons"

This reverts commit 155de10, reversing
changes made to 47c3d3a.
  • Loading branch information
Zelldon committed Aug 8, 2022
1 parent 155de10 commit 7a8db74
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void setup() {

private RecordProcessorContextImpl createContext(
final ProcessingScheduleService executor, final ZeebeDb zeebeDb) {
return new RecordProcessorContextImpl(1, executor, zeebeDb, zeebeDb.createContext());
return new RecordProcessorContextImpl(1, executor, zeebeDb, zeebeDb.createContext(), null);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
Expand Down Expand Up @@ -128,6 +129,7 @@ private static StreamProcessor createStreamProcessor(
.actorSchedulingService(context.getActorSchedulingService())
.zeebeDb(context.getZeebeDb())
.recordProcessor(new Engine(context.getTypedRecordProcessorFactory()))
.eventApplierFactory(EventAppliers::new)
.nodeId(context.getNodeId())
.commandResponseWriter(context.getCommandResponseWriter())
.listener(processedCommand -> context.getOnProcessedListener().accept(processedCommand))
Expand Down
3 changes: 1 addition & 2 deletions engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.engine.state.processing.DbBlackListState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.protocol.impl.record.value.error.ErrorRecord;
Expand Down Expand Up @@ -64,7 +63,7 @@ public void init(final RecordProcessorContext recordProcessorContext) {
recordProcessorContext.getPartitionId(),
recordProcessorContext.getZeebeDb(),
recordProcessorContext.getTransactionContext());
eventApplier = new EventAppliers(zeebeState);
eventApplier = recordProcessorContext.getEventApplierFactory().apply(zeebeState);

writers = new Writers(resultBuilderMutex, eventApplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.List;
import java.util.function.Function;

public interface RecordProcessorContext {

Expand All @@ -22,6 +25,8 @@ public interface RecordProcessorContext {

TransactionContext getTransactionContext();

Function<MutableZeebeState, EventApplier> getEventApplierFactory();

List<StreamProcessorLifecycleAware> getLifecycleListeners();

StreamProcessorListener getStreamProcessorListener();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,33 @@
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public final class RecordProcessorContextImpl implements RecordProcessorContext {

private final int partitionId;
private final ProcessingScheduleService scheduleService;
private final ZeebeDb zeebeDb;
private final TransactionContext transactionContext;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final List<StreamProcessorLifecycleAware> lifecycleListeners = new ArrayList<>();
private StreamProcessorListener streamProcessorListener;

public RecordProcessorContextImpl(
final int partitionId,
final ProcessingScheduleService scheduleService,
final ZeebeDb zeebeDb,
final TransactionContext transactionContext) {
final TransactionContext transactionContext,
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {
this.partitionId = partitionId;
this.scheduleService = scheduleService;
this.zeebeDb = zeebeDb;
this.transactionContext = transactionContext;
this.eventApplierFactory = eventApplierFactory;
}

@Override
Expand All @@ -56,6 +62,11 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}

@Override
public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

@Override
public List<StreamProcessorLifecycleAware> getLifecycleListeners() {
return lifecycleListeners;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
Expand Down Expand Up @@ -85,6 +87,7 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private final ActorSchedulingService actorSchedulingService;
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final List<StreamProcessorLifecycleAware> lifecycleAwareListeners;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final Set<FailureListener> failureListeners = new HashSet<>();
private final StreamProcessorMetrics metrics;

Expand Down Expand Up @@ -118,6 +121,8 @@ protected StreamProcessor(final StreamProcessorBuilder processorBuilder) {
typedStreamWriterFactory = processorBuilder.getTypedStreamWriterFactory();
zeebeDb = processorBuilder.getZeebeDb();

eventApplierFactory = processorBuilder.getEventApplierFactory();

streamProcessorContext =
processorBuilder
.getProcessingContext()
Expand Down Expand Up @@ -317,7 +322,8 @@ private void initEngine() {
partitionId,
streamProcessorContext.getScheduleService(),
zeebeDb,
streamProcessorContext.getTransactionContext());
streamProcessorContext.getTransactionContext(),
eventApplierFactory);
recordProcessor.init(engineContext);

lifecycleAwareListeners.addAll(engineContext.getLifecycleListeners());
Expand Down Expand Up @@ -360,6 +366,7 @@ private long recoverFromSnapshot() {
private void recoverZeebeDbState(final TransactionContext transactionContext) {
final ZeebeDbState zeebeState = new ZeebeDbState(partitionId, zeebeDb, transactionContext);
streamProcessorContext.zeebeState(zeebeState);
streamProcessorContext.eventApplier(eventApplierFactory.apply(zeebeState));
}

private void onRecovered(final LastProcessingPositions lastProcessingPositions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriterImpl;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
Expand All @@ -28,6 +30,7 @@ public final class StreamProcessorBuilder {
private final List<StreamProcessorLifecycleAware> lifecycleListeners = new ArrayList<>();
private ActorSchedulingService actorSchedulingService;
private ZeebeDb zeebeDb;
private Function<MutableZeebeState, EventApplier> eventApplierFactory;
private int nodeId;
private Function<LogStreamBatchWriter, LegacyTypedStreamWriter> typedStreamWriterFactory =
LegacyTypedStreamWriterImpl::new;
Expand Down Expand Up @@ -75,6 +78,12 @@ public StreamProcessorBuilder zeebeDb(final ZeebeDb zeebeDb) {
return this;
}

public StreamProcessorBuilder eventApplierFactory(
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {
this.eventApplierFactory = eventApplierFactory;
return this;
}

public StreamProcessorBuilder streamProcessorMode(final StreamProcessorMode streamProcessorMode) {
streamProcessorContext.processorMode(streamProcessorMode);
return this;
Expand Down Expand Up @@ -104,6 +113,10 @@ public RecordProcessor getRecordProcessor() {
return recordProcessor;
}

public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}

public StreamProcessor build() {
validate();

Expand All @@ -114,6 +127,7 @@ private void validate() {
Objects.requireNonNull(actorSchedulingService, "No task scheduler provided.");
Objects.requireNonNull(streamProcessorContext.getLogStream(), "No log stream provided.");
Objects.requireNonNull(zeebeDb, "No database provided.");
Objects.requireNonNull(eventApplierFactory, "No factory for the event supplier provided.");
}

public Function<LogStreamBatchWriter, LegacyTypedStreamWriter> getTypedStreamWriterFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriterImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
Expand All @@ -38,6 +39,7 @@ public final class StreamProcessorContext implements ReadonlyStreamProcessorCont
private RecordValues recordValues;
private ZeebeDbState zeebeState;
private TransactionContext transactionContext;
private EventApplier eventApplier;

private BooleanSupplier abortCondition;
private StreamProcessorListener streamProcessorListener = NOOP_LISTENER;
Expand Down Expand Up @@ -142,6 +144,11 @@ public LegacyTypedResponseWriterImpl getTypedResponseWriter() {
return typedResponseWriter;
}

public StreamProcessorContext eventApplier(final EventApplier eventApplier) {
this.eventApplier = eventApplier;
return this;
}

public StreamProcessorContext processorMode(final StreamProcessorMode streamProcessorMode) {
this.streamProcessorMode = streamProcessorMode;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public void onRecovered(final ReadonlyStreamProcessorContext context) {
.commandResponseWriter(mockCommandResponseWriter)
.listener(mockStreamProcessorListener)
.recordProcessor(new Engine(wrappedFactory))
.eventApplierFactory(eventApplierFactory)
.streamProcessorMode(streamProcessorMode);

if (streamWriterFactory != null) {
Expand Down

0 comments on commit 7a8db74

Please sign in to comment.