Skip to content

Commit

Permalink
merge: #9852
Browse files Browse the repository at this point in the history
9852: Move stream processor classes into the stream processor package r=pihme a=pihme

## Description

- Move classes that are only used by stream processor into the corresponding package
- There were no other changes apart from moving the classes

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #9725 

**Merge only after** #9853  (this PR can easily be regenerated; the other one cannot)



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Jul 22, 2022
2 parents 6efb06b + 385d9de commit 6089f84
Show file tree
Hide file tree
Showing 21 changed files with 16 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.camunda.zeebe.broker.exporter.stream.ExporterDirectorContext.ExporterMode;
import io.camunda.zeebe.broker.system.partitions.PartitionMessagingService;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.EventFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.logstreams.log.LogRecordAwaiter;
Expand All @@ -30,6 +29,7 @@
import io.camunda.zeebe.scheduler.retry.BackOffRetryStrategy;
import io.camunda.zeebe.scheduler.retry.EndlessRetryStrategy;
import io.camunda.zeebe.scheduler.retry.RetryStrategy;
import io.camunda.zeebe.streamprocessor.EventFilter;
import io.camunda.zeebe.streamprocessor.TypedRecordImpl;
import io.camunda.zeebe.util.exception.UnrecoverableException;
import io.camunda.zeebe.util.health.FailureListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.camunda.zeebe.broker.system.partitions.NoEntryAtSnapshotPosition;
import io.camunda.zeebe.broker.system.partitions.StateController;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.future.ActorFuture;
Expand All @@ -21,6 +20,7 @@
import io.camunda.zeebe.snapshots.SnapshotException.SnapshotNotFoundException;
import io.camunda.zeebe.snapshots.TransientSnapshot;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.util.health.FailureListener;
import io.camunda.zeebe.util.health.HealthMonitorable;
import io.camunda.zeebe.util.health.HealthReport;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import io.atomix.raft.RaftServer.Role;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionContext;
import io.camunda.zeebe.broker.system.partitions.PartitionTransitionStep;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.state.appliers.EventAppliers;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import java.util.function.BiFunction;

public final class StreamProcessorTransitionStep implements PartitionTransitionStep {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import java.util.Objects;

/** Implement to control which events should be handled by a {@link StreamProcessor}. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

/**
* Contains positions which are related to the last processing, and are used to restore the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import java.util.Objects;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.EventFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.LastProcessingPositions;
import io.camunda.zeebe.engine.processing.streamprocessor.MetadataEventFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.MetadataFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProtocolVersionFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,8 @@
import io.camunda.zeebe.engine.api.RecordProcessor;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.metrics.ReplayMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.EventFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.LastProcessingPositions;
import io.camunda.zeebe.engine.processing.streamprocessor.MetadataEventFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.MetadataFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingException;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordProtocolVersionFilter;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.impl.Loggers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
import io.camunda.zeebe.engine.EngineContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.LastProcessingPositions;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.camunda.zeebe.engine.processing.bpmn.behavior.LegacyTypedStreamWriterProxy;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriterImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.streamprocessor;

public enum StreamProcessorMode {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.mockito.Mockito.mock;

import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.streamprocessor.EventFilter;
import org.junit.Test;

public final class EventFilterTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessor.Phase;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import org.awaitility.Awaitility;
import org.junit.Rule;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.ZbColumnFamilies;
Expand Down Expand Up @@ -55,6 +54,7 @@
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.streamprocessor.TypedRecordImpl;
import io.camunda.zeebe.test.util.TestUtil;
import io.camunda.zeebe.test.util.record.RecordingExporter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import io.camunda.zeebe.db.ZeebeDbFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
Expand All @@ -31,6 +30,7 @@
import io.camunda.zeebe.scheduler.clock.ControlledActorClock;
import io.camunda.zeebe.scheduler.testing.ActorSchedulerRule;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.streamprocessor.state.MutableLastProcessedPositionState;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.util.FileUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
Expand All @@ -45,6 +44,7 @@
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.ActorScheduler;
import io.camunda.zeebe.streamprocessor.StreamProcessor;
import io.camunda.zeebe.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.test.util.AutoCloseableRule;
import io.camunda.zeebe.util.FileUtil;
import java.io.IOException;
Expand Down

0 comments on commit 6089f84

Please sign in to comment.