Skip to content

Commit

Permalink
merge: #9989
Browse files Browse the repository at this point in the history
9989: Create engine outside r=Zelldon a=Zelldon

## Description

Create the engine outside of the StreamProcessor, which should allow to also inject other RecordProcessors.
Furthermore we can fill the Engine with the needed TypedRecordProcessorFactory, which means we no longer need that in our builders and contexts.

I hope this helps you in make further progress `@deepthidevaki` 


Sidenote:

This also simplifies the StreamProcessor/RecordProcessor tests, since we do not necessarily need to set up so much anymore.
<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #9725



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 5, 2022
2 parents bdc2405 + fae722d commit bfd3734
Show file tree
Hide file tree
Showing 13 changed files with 138 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.protocol.impl.record.value.management.CheckpointRecord;
import io.camunda.zeebe.protocol.record.ValueType;
Expand All @@ -26,7 +27,7 @@
import org.slf4j.LoggerFactory;

/** Process and replays records related to Checkpoint. */
public final class CheckpointRecordsProcessor implements RecordProcessor<Context> {
public final class CheckpointRecordsProcessor implements RecordProcessor {

private static final Logger LOG = LoggerFactory.getLogger(CheckpointRecordsProcessor.class);

Expand All @@ -46,11 +47,11 @@ public CheckpointRecordsProcessor(final BackupManager backupManager) {
}

@Override
public void init(final Context recordProcessorContext) {
executor = recordProcessorContext.executor();
public void init(final RecordProcessorContext recordProcessorContext) {
executor = recordProcessorContext.getScheduleService();
checkpointState =
new DbCheckpointState(
recordProcessorContext.zeebeDb(), recordProcessorContext.transactionContext());
recordProcessorContext.getZeebeDb(), recordProcessorContext.getTransactionContext());

checkpointCreateProcessor =
new CheckpointCreateProcessor(checkpointState, backupManager, checkpointListeners);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.camunda.zeebe.protocol.impl.record.value.management.CheckpointRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.management.CheckpointIntent;
import io.camunda.zeebe.streamprocessor.RecordProcessorContextImpl;
import java.nio.file.Path;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.jupiter.api.AfterEach;
Expand All @@ -54,7 +55,7 @@ void setup() {
new ZeebeRocksDbFactory<>(
new RocksDbConfiguration(), new ConsistencyChecksSettings(true, true))
.createDb(database.toFile());
final var context = new Context(zeebedb, zeebedb.createContext(), executor);
final RecordProcessorContextImpl context = createContext(executor, zeebedb);

resultBuilder = new MockProcessingResultBuilder();
processor = new CheckpointRecordsProcessor(backupManager);
Expand All @@ -63,6 +64,12 @@ void setup() {
state = new DbCheckpointState(zeebedb, zeebedb.createContext());
}

private RecordProcessorContextImpl createContext(
final ProcessingScheduleService executor, final ZeebeDb zeebeDb) {
return new RecordProcessorContextImpl(
1, executor, zeebeDb, zeebeDb.createContext(), null, null, null);
}

@AfterEach
void after() throws Exception {
zeebedb.close();
Expand Down Expand Up @@ -228,7 +235,7 @@ void shouldNotifyListenerWhenReplayed() {
@Test
void shouldNotifyListenerOnInit() {
// given
final var context = new Context(zeebedb, zeebedb.createContext(), null);
final RecordProcessorContextImpl context = createContext(null, zeebedb);
processor = new CheckpointRecordsProcessor(backupManager);
final long checkpointId = 3;
final long checkpointPosition = 30;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ public Consumer<TypedRecord<?>> getOnProcessedListener() {
}

@Override
public TypedRecordProcessorFactory getStreamProcessorFactory() {
public TypedRecordProcessorFactory getTypedRecordProcessorFactory() {
return typedRecordProcessorsFactory::createTypedStreamProcessor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public interface PartitionTransitionContext extends PartitionContext {

Consumer<TypedRecord<?>> getOnProcessedListener();

TypedRecordProcessorFactory getStreamProcessorFactory();
TypedRecordProcessorFactory getTypedRecordProcessorFactory();

ConcurrencyControl getConcurrencyControl();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.atomix.raft.RaftServer.Role;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.Engine;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
Expand Down Expand Up @@ -127,11 +128,11 @@ private static StreamProcessor createStreamProcessor(
.logStream(context.getLogStream())
.actorSchedulingService(context.getActorSchedulingService())
.zeebeDb(context.getZeebeDb())
.recordProcessor(new Engine(context.getTypedRecordProcessorFactory()))
.eventApplierFactory(EventAppliers::new)
.nodeId(context.getNodeId())
.commandResponseWriter(context.getCommandResponseWriter())
.listener(processedCommand -> context.getOnProcessedListener().accept(processedCommand))
.streamProcessorFactory(context.getStreamProcessorFactory())
.streamProcessorMode(streamProcessorMode)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class TestPartitionTransitionContext implements PartitionTransitionContex
private Role currentRole;
private long currentTerm;
private HealthMonitor healthMonitor;
private TypedRecordProcessorFactory streamProcessorFactory;
private TypedRecordProcessorFactory typedRecordProcessorFactory;
private ExporterDirector exporterDirector;
private LogStream logStream;
private StreamProcessor streamProcessor;
Expand Down Expand Up @@ -264,12 +264,8 @@ public Consumer<TypedRecord<?>> getOnProcessedListener() {
}

@Override
public TypedRecordProcessorFactory getStreamProcessorFactory() {
return streamProcessorFactory;
}

public void setStreamProcessorFactory(final TypedRecordProcessorFactory streamProcessorFactory) {
this.streamProcessorFactory = streamProcessorFactory;
public TypedRecordProcessorFactory getTypedRecordProcessorFactory() {
return typedRecordProcessorFactory;
}

@Override
Expand All @@ -282,6 +278,11 @@ public void setConcurrencyControl(final ConcurrencyControl concurrencyControl) {
this.concurrencyControl = concurrencyControl;
}

public void setTypedRecordProcessorFactory(
final TypedRecordProcessorFactory typedRecordProcessorFactory) {
this.typedRecordProcessorFactory = typedRecordProcessorFactory;
}

public void setRaftPartition(final RaftPartition raftPartition) {
this.raftPartition = raftPartition;
}
Expand Down
34 changes: 21 additions & 13 deletions engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProcessorMap;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContextImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
Expand All @@ -31,7 +33,7 @@
import java.util.function.Supplier;
import org.slf4j.Logger;

public class Engine implements RecordProcessor<EngineContext> {
public class Engine implements RecordProcessor {

private static final Logger LOG = Loggers.PROCESSOR_LOGGER;
private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT =
Expand All @@ -50,36 +52,42 @@ public class Engine implements RecordProcessor<EngineContext> {
new ProcessingResultBuilderMutex();

private Writers writers;
private TypedRecordProcessorFactory typedRecordProcessorFactory;

public Engine() {}

public Engine(final TypedRecordProcessorFactory typedRecordProcessorFactory) {
this.typedRecordProcessorFactory = typedRecordProcessorFactory;
}

@Override
public void init(final EngineContext engineContext) {
streamWriter = engineContext.getStreamWriterProxy();
responseWriter = engineContext.getTypedResponseWriter();
public void init(final RecordProcessorContext recordProcessorContext) {
streamWriter = recordProcessorContext.getStreamWriterProxy();
responseWriter = recordProcessorContext.getTypedResponseWriter();

zeebeState =
new ZeebeDbState(
engineContext.getPartitionId(),
engineContext.getZeebeDb(),
engineContext.getTransactionContext());
eventApplier = engineContext.getEventApplierFactory().apply(zeebeState);
recordProcessorContext.getPartitionId(),
recordProcessorContext.getZeebeDb(),
recordProcessorContext.getTransactionContext());
eventApplier = recordProcessorContext.getEventApplierFactory().apply(zeebeState);

writers = new Writers(resultBuilderMutex, eventApplier);

final var typedProcessorContext =
new TypedRecordProcessorContextImpl(
engineContext.getPartitionId(),
engineContext.getScheduleService(),
recordProcessorContext.getPartitionId(),
recordProcessorContext.getScheduleService(),
zeebeState,
writers);

final TypedRecordProcessors typedRecordProcessors =
engineContext.getTypedRecordProcessorFactory().createProcessors(typedProcessorContext);
typedRecordProcessorFactory.createProcessors(typedProcessorContext);

engineContext.setStreamProcessorListener(typedProcessorContext.getStreamProcessorListener());
recordProcessorContext.setStreamProcessorListener(
typedProcessorContext.getStreamProcessorListener());

engineContext.setLifecycleListeners(typedRecordProcessors.getLifecycleListeners());
recordProcessorContext.addLifecycleListeners(typedRecordProcessors.getLifecycleListeners());
recordProcessorMap = typedRecordProcessors.getRecordProcessorMap();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
* Interface for record processors. A record processor is responsible for handling a single record.
* (The class {@code StreamProcessor} in turn is responsible for handling a stream of records.
*/
public interface RecordProcessor<CONTEXT> {
public interface RecordProcessor {

/**
* Called by platform to initialize the processor
*
* @param recordProcessorContext context object to initialize the processor
*/
void init(CONTEXT recordProcessorContext);
void init(RecordProcessorContext recordProcessorContext);

/**
* Called by platform in order to replay a single record
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.List;
import java.util.function.Function;

public interface RecordProcessorContext {

int getPartitionId();

ProcessingScheduleService getScheduleService();

ZeebeDb getZeebeDb();

TransactionContext getTransactionContext();

LegacyTypedStreamWriter getStreamWriterProxy();

LegacyTypedResponseWriter getTypedResponseWriter();

Function<MutableZeebeState, EventApplier> getEventApplierFactory();

List<StreamProcessorLifecycleAware> getLifecycleListeners();

StreamProcessorListener getStreamProcessorListener();

// only used for tests
@Deprecated
void setStreamProcessorListener(final StreamProcessorListener streamProcessorListener);

void addLifecycleListeners(final List<StreamProcessorLifecycleAware> lifecycleListeners);
}

0 comments on commit bfd3734

Please sign in to comment.