Skip to content

Commit

Permalink
merge: #9735
Browse files Browse the repository at this point in the history
9735: 9725 engine abstraction part 1 r=pihme a=pihme

## Description

- adds a new package into which the platform classes will be moved temporarily (with the idea to later move them into a new Maven module)
- adds a test that warns about unwanted dependencies to that new package
- moves some relevant classes into the new package
- adds skeleton interfaces for the engine abstraction
- implements replay functionality in new `Engine` class and calls the replay from the existing classes

## Related issues

related to #9725 



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Jul 8, 2022
2 parents 6719c6b + 13c2775 commit b47cd2b
Show file tree
Hide file tree
Showing 52 changed files with 323 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.EventFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedEventImpl;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
import io.camunda.zeebe.logstreams.log.LogStream;
Expand All @@ -31,6 +30,7 @@
import io.camunda.zeebe.scheduler.retry.BackOffRetryStrategy;
import io.camunda.zeebe.scheduler.retry.EndlessRetryStrategy;
import io.camunda.zeebe.scheduler.retry.RetryStrategy;
import io.camunda.zeebe.streamprocessor.TypedRecordImpl;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
Expand Down Expand Up @@ -467,7 +467,7 @@ private static class RecordExporter {
private final RecordValues recordValues = new RecordValues();
private final RecordMetadata rawMetadata = new RecordMetadata();
private final List<ExporterContainer> containers;
private final TypedEventImpl typedEvent;
private final TypedRecordImpl typedEvent;
private final ExporterMetrics exporterMetrics;

private boolean shouldExport;
Expand All @@ -478,7 +478,7 @@ private static class RecordExporter {
final List<ExporterContainer> containers,
final int partitionId) {
this.containers = containers;
typedEvent = new TypedEventImpl(partitionId);
typedEvent = new TypedRecordImpl(partitionId);
this.exporterMetrics = exporterMetrics;
}

Expand Down Expand Up @@ -518,7 +518,7 @@ public boolean export() {
return true;
}

TypedEventImpl getTypedEvent() {
TypedRecordImpl getTypedEvent() {
return typedEvent;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
import io.camunda.zeebe.broker.partitioning.NoOpPartitionAdminAccess;
import io.camunda.zeebe.broker.partitioning.PartitionAdminAccess;
import io.camunda.zeebe.broker.system.partitions.ZeebePartition;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotMetadata;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import io.atomix.raft.RaftServer.Role;
import io.camunda.zeebe.broker.exporter.stream.ExporterPhase;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor.Phase;
import io.camunda.zeebe.streamprocessor.StreamProcessor.Phase;

public final class PartitionStatus {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package io.camunda.zeebe.broker.system.partitions;

import io.camunda.zeebe.broker.exporter.stream.ExporterDirector;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import java.io.IOException;

public interface PartitionAdminControl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.camunda.zeebe.broker.exporter.stream.ExporterDirector;
import io.camunda.zeebe.broker.system.partitions.impl.AsyncSnapshotDirector;
import io.camunda.zeebe.broker.system.partitions.impl.PartitionProcessingState;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import java.io.IOException;
import java.util.function.Supplier;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import io.atomix.raft.RaftServer.Role;
import io.atomix.raft.partition.RaftPartition;
import io.camunda.zeebe.broker.exporter.stream.ExporterDirector;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.util.health.HealthMonitor;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.camunda.zeebe.broker.system.partitions.impl.AsyncSnapshotDirector;
import io.camunda.zeebe.broker.system.partitions.impl.PartitionProcessingState;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
Expand All @@ -32,6 +31,7 @@
import io.camunda.zeebe.scheduler.ScheduledTimer;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.snapshots.PersistedSnapshotStore;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.util.health.HealthMonitor;
import java.util.Collection;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageMonitor;
import io.camunda.zeebe.broker.system.partitions.impl.AsyncSnapshotDirector;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.system.monitoring.HealthMetrics;
import io.camunda.zeebe.broker.system.partitions.impl.RecoverablePartitionTransitionException;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.scheduler.health.CriticalComponentsHealthMonitor;
import io.camunda.zeebe.scheduler.startup.StartupProcess;
import io.camunda.zeebe.scheduler.startup.StartupStep;
import io.camunda.zeebe.snapshots.PersistedSnapshotStore;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.scheduler.Actor;
Expand All @@ -21,6 +20,7 @@
import io.camunda.zeebe.snapshots.SnapshotException;
import io.camunda.zeebe.snapshots.SnapshotException.SnapshotNotFoundException;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import io.atomix.raft.RaftServer.Role;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import java.util.function.BiFunction;

public final class StreamProcessorTransitionStep implements PartitionTransitionStep {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
import io.atomix.raft.storage.log.entry.ApplicationEntry;
import io.camunda.zeebe.broker.system.partitions.impl.AsyncSnapshotDirector;
import io.camunda.zeebe.broker.system.partitions.impl.StateControllerImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.scheduler.testing.TestConcurrencyControl;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.PersistedSnapshot;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStoreFactory;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import java.io.IOException;
import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageMonitor;
import io.camunda.zeebe.broker.system.partitions.impl.AsyncSnapshotDirector;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
Expand All @@ -28,6 +27,7 @@
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.testing.TestActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.util.health.HealthMonitor;
import java.util.Collection;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.StreamProcessorTransitionStep;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.testing.TestConcurrencyControl;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.util.health.HealthMonitor;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import io.camunda.zeebe.broker.system.partitions.impl.steps.StreamProcessorTransitionStep;
import io.camunda.zeebe.broker.system.partitions.impl.steps.ZeebeDbPartitionTransitionStep;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.util.health.HealthMonitor;
import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import io.camunda.zeebe.broker.system.configuration.BrokerCfg;
import io.camunda.zeebe.broker.system.partitions.TestPartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.impl.AsyncSnapshotDirector;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.testing.TestActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.util.health.HealthMonitor;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import io.atomix.raft.RaftServer.Role;
import io.camunda.zeebe.broker.system.partitions.TestPartitionTransitionContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorBuilder;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.scheduler.testing.TestActorFuture;
import io.camunda.zeebe.scheduler.testing.TestConcurrencyControl;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorBuilder;
import io.camunda.zeebe.util.health.HealthMonitor;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
Expand Down
19 changes: 19 additions & 0 deletions engine/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,25 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.tngtech.archunit</groupId>
<artifactId>archunit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.tngtech.archunit</groupId>
<artifactId>archunit-junit5-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.tngtech.archunit</groupId>
<artifactId>archunit-junit5-engine</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
61 changes: 61 additions & 0 deletions engine/src/main/java/io/camunda/zeebe/engine/EngineImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.api.Engine;
import io.camunda.zeebe.engine.api.EngineContext;
import io.camunda.zeebe.engine.api.ErrorHandlingContext;
import io.camunda.zeebe.engine.api.ProcessingContext;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.function.Function;

public class EngineImpl implements Engine {

private final EventApplier eventApplier;

public EngineImpl(
final int partitionId,
final ZeebeDb zeebeDb,
final Function<MutableZeebeState, EventApplier> eventApplierFactory) {

final TransactionContext transactionContext = zeebeDb.createContext();
final var zeebeState = new ZeebeDbState(partitionId, zeebeDb, transactionContext);
eventApplier = eventApplierFactory.apply(zeebeState);
}

@Override
public void init(final EngineContext engineContext) {
throw new IllegalStateException("Not yet implemented");
}

@Override
public void replay(final TypedRecord event) {
eventApplier.applyState(event.getKey(), event.getIntent(), event.getValue());
}

@Override
public ProcessingResult process(
final TypedRecord record, final ProcessingContext processingContext) {
throw new IllegalStateException("Not yet implemented");
}

@Override
public ProcessingResult onProcessingError(
final Throwable processingException,
final TypedRecord record,
final long position,
final ErrorHandlingContext errorHandlingContext) {
throw new IllegalStateException("Not yet implemented");
}
}
25 changes: 25 additions & 0 deletions engine/src/main/java/io/camunda/zeebe/engine/api/Engine.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecord;

public interface Engine {

void init(EngineContext engineContext);

void replay(TypedRecord record);

ProcessingResult process(TypedRecord record, ProcessingContext processingContext);

ProcessingResult onProcessingError(
Throwable processingException,
TypedRecord record,
long position,
ErrorHandlingContext errorHandlingContext);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

public interface EngineContext {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

// TODO check after refactoring, whether this is the same as ProcessingContext
public interface ErrorHandlingContext {
ProcessingResultBuilder getProcessingResultBuilder();
}

0 comments on commit b47cd2b

Please sign in to comment.