Skip to content

Commit

Permalink
merge: #9743
Browse files Browse the repository at this point in the history
9743: Refactor and Move `ProcessorContext` r=pihme a=pihme

## Description

* Rename `(ReadOnly)ProcessorContext` to  `StreamProcessorContext`
* Move interface to `api` package, move implementation to `streamprocessor` package
* Simplifiy code

## Related issues

related to #9725 



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Jul 8, 2022
2 parents 180351a + cbcf66e commit 2fec923
Show file tree
Hide file tree
Showing 34 changed files with 222 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@
import io.camunda.zeebe.broker.system.partitions.impl.steps.ZeebeDbPartitionTransitionStep;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiService;
import io.camunda.zeebe.db.impl.rocksdb.ZeebeRocksDbFactory;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.EngineProcessors;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
Expand Down Expand Up @@ -209,10 +207,8 @@ private TypedRecordProcessorsFactory createFactory(
final ClusterEventService eventService,
final PushDeploymentRequestHandler deploymentRequestHandler,
final FeatureFlags featureFlags) {
return (ProcessingContext processingContext) -> {
final var actor = processingContext.getActor();

final LogStream stream = processingContext.getLogStream();
return (recordProcessorContext) -> {
final var actor = recordProcessorContext.getActor();

final TopologyPartitionListenerImpl partitionListener =
new TopologyPartitionListenerImpl(actor);
Expand All @@ -225,14 +221,15 @@ private TypedRecordProcessorsFactory createFactory(
final PartitionCommandSenderImpl partitionCommandSender =
new PartitionCommandSenderImpl(communicationService, partitionListener);
final SubscriptionCommandSender subscriptionCommandSender =
new SubscriptionCommandSender(stream.getPartitionId(), partitionCommandSender);
new SubscriptionCommandSender(
recordProcessorContext.getPartitionId(), partitionCommandSender);

final LongPollingJobNotification jobsAvailableNotification =
new LongPollingJobNotification(eventService);

final var processor =
EngineProcessors.createEngineProcessors(
processingContext,
recordProcessorContext,
localBroker.getPartitionsCount(),
subscriptionCommandSender,
deploymentDistributor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
*/
package io.camunda.zeebe.broker.system.partitions;

import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;

@FunctionalInterface
public interface TypedRecordProcessorsFactory {

TypedRecordProcessors createTypedStreamProcessor(ProcessingContext processingContext);
TypedRecordProcessors createTypedStreamProcessor(RecordProcessorContext recordProcessorContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,15 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.scheduler.ActorControl;
import java.util.function.BooleanSupplier;

public interface ReadonlyProcessingContext {
public interface ReadonlyStreamProcessorContext {

/**
* @return the actor on which the processing runs
Expand All @@ -29,17 +25,6 @@ public interface ReadonlyProcessingContext {
*/
LogStream getLogStream();

/**
* @return the reader, which is used by the processor to read next events
*/
LogStreamReader getLogStreamReader();

/**
* @return the maximum fragment size we can write and read this contains the record metadata and
* record value etc.
*/
int getMaxFragmentSize();

/**
* @return the actual log stream writer, used to write any record
*/
Expand All @@ -50,33 +35,15 @@ public interface ReadonlyProcessingContext {
*/
Writers getWriters();

/**
* @return the pool, which contains the mapping from ValueType to UnpackedObject (record)
*/
RecordValues getRecordValues();

/**
* @return the map of processors, which are executed during processing
*/
RecordProcessorMap getRecordProcessorMap();

/**
* @return the state, where the data is stored during processing
*/
MutableZeebeState getZeebeState();

/**
* @return the transaction context for the current actor
*/
TransactionContext getTransactionContext();

/**
* @return condition which indicates, whether the processing should stop or not
*/
BooleanSupplier getAbortCondition();

/**
* @return the consumer of events to apply their state changes
* Returns the partition ID
*
* @return partition ID
*/
EventApplier getEventApplier();
int getPartitionId();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.LastProcessedPositionState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.scheduler.ActorControl;

public interface RecordProcessorContext {

int getPartitionId();

ActorControl getActor();

MutableZeebeState getZeebeState();

Writers getWriters();

LastProcessedPositionState getLastProcessedPositionState();

RecordProcessorContext listener(StreamProcessorListener streamProcessorListener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.engine.api;

public interface StreamProcessorLifecycleAware {

/** Callback after reprocessing was successful and before regular processing begins */
default void onRecovered(final ReadonlyProcessingContext context) {}
default void onRecovered(final ReadonlyStreamProcessorContext context) {}

/** Callback which is called when StreamProcessor is on closing phase. */
default void onClose() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.camunda.zeebe.protocol.record.intent.DeploymentIntent.CREATE;

import io.camunda.zeebe.el.ExpressionLanguageFactory;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.metrics.JobMetrics;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnEventPublicationBehavior;
import io.camunda.zeebe.engine.processing.common.CatchEventBehavior;
Expand All @@ -25,7 +26,6 @@
import io.camunda.zeebe.engine.processing.job.JobEventProcessors;
import io.camunda.zeebe.engine.processing.message.MessageEventProcessors;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
Expand All @@ -35,7 +35,6 @@
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
import io.camunda.zeebe.engine.state.migration.DbMigrationController;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
Expand All @@ -47,27 +46,26 @@
public final class EngineProcessors {

public static TypedRecordProcessors createEngineProcessors(
final ProcessingContext processingContext,
final RecordProcessorContext recordProcessorContext,
final int partitionsCount,
final SubscriptionCommandSender subscriptionCommandSender,
final DeploymentDistributor deploymentDistributor,
final DeploymentResponder deploymentResponder,
final Consumer<String> onJobsAvailableCallback,
final FeatureFlags featureFlags) {

final var actor = processingContext.getActor();
final MutableZeebeState zeebeState = processingContext.getZeebeState();
final var writers = processingContext.getWriters();
final var actor = recordProcessorContext.getActor();
final MutableZeebeState zeebeState = recordProcessorContext.getZeebeState();
final var writers = recordProcessorContext.getWriters();
final TypedRecordProcessors typedRecordProcessors =
TypedRecordProcessors.processors(zeebeState.getKeyGenerator(), writers);

// register listener that handles migrations immediately, so it is the first to be called
typedRecordProcessors.withListener(new DbMigrationController());

typedRecordProcessors.withListener(processingContext.getZeebeState());
typedRecordProcessors.withListener(zeebeState);

final LogStream stream = processingContext.getLogStream();
final int partitionId = stream.getPartitionId();
final int partitionId = recordProcessorContext.getPartitionId();

final var variablesState = zeebeState.getVariableState();
final var expressionProcessor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
*/
package io.camunda.zeebe.engine.processing.deployment.distribute;

import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.state.immutable.DeploymentState;

public class DeploymentRedistributor implements StreamProcessorLifecycleAware {
Expand All @@ -27,7 +27,7 @@ public DeploymentRedistributor(
}

@Override
public void onRecovered(final ReadonlyProcessingContext context) {
public void onRecovered(final ReadonlyStreamProcessorContext context) {
final var actor = context.getActor();
final var writers = context.getWriters();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
*/
package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.scheduled.DueDateChecker;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.state.immutable.JobState;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.scheduler.clock.ActorClock;
Expand Down Expand Up @@ -42,7 +42,7 @@ public void scheduleBackOff(final long dueDate) {
}

@Override
public void onRecovered(final ReadonlyProcessingContext context) {
public void onRecovered(final ReadonlyStreamProcessorContext context) {
backOffDueDateChecker.onRecovered(context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
*/
package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.metrics.JobMetrics;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnEventPublicationBehavior;
import io.camunda.zeebe.engine.processing.common.EventHandle;
import io.camunda.zeebe.engine.processing.common.EventTriggerBehavior;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
Expand Down Expand Up @@ -75,7 +75,7 @@ ValueType.JOB, JobIntent.UPDATE_RETRIES, new JobUpdateRetriesProcessor(zeebeStat
.withListener(
new StreamProcessorLifecycleAware() {
@Override
public void onRecovered(final ReadonlyProcessingContext context) {
public void onRecovered(final ReadonlyStreamProcessorContext context) {
jobState.setJobsAvailableCallback(onJobsAvailableCallback);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

import static io.camunda.zeebe.scheduler.clock.ActorClock.currentTimeMillis;

import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.state.immutable.JobState;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
Expand All @@ -23,14 +23,14 @@ public final class JobTimeoutTrigger implements StreamProcessorLifecycleAware {

private ScheduledTimer timer;
private TypedCommandWriter writer;
private ReadonlyProcessingContext processingContext;
private ReadonlyStreamProcessorContext processingContext;

public JobTimeoutTrigger(final JobState state) {
this.state = state;
}

@Override
public void onRecovered(final ReadonlyProcessingContext processingContext) {
public void onRecovered(final ReadonlyStreamProcessorContext processingContext) {
this.processingContext = processingContext;
timer =
this.processingContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
*/
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.mutable.MutablePendingMessageSubscriptionState;
import io.camunda.zeebe.scheduler.ActorControl;
Expand All @@ -36,7 +36,7 @@ public MessageObserver(
}

@Override
public void onRecovered(final ReadonlyProcessingContext context) {
public void onRecovered(final ReadonlyStreamProcessorContext context) {
final ActorControl actor = context.getActor();
// it is safe to reuse the write because we running in the same actor/thread
final MessageTimeToLiveChecker timeToLiveChecker =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
*/
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.mutable.MutablePendingProcessMessageSubscriptionState;
import io.camunda.zeebe.scheduler.ActorControl;
Expand Down Expand Up @@ -39,7 +39,7 @@ public PendingProcessMessageSubscriptionChecker(
}

@Override
public void onRecovered(final ReadonlyProcessingContext context) {
public void onRecovered(final ReadonlyStreamProcessorContext context) {
actor = context.getActor();
scheduleTimer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
*/
package io.camunda.zeebe.engine.processing.scheduled;

import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.scheduler.ActorControl;
Expand Down Expand Up @@ -85,7 +85,7 @@ private Duration calculateDelayForNextRun(final long dueDate) {
}

@Override
public void onRecovered(final ReadonlyProcessingContext processingContext) {
public void onRecovered(final ReadonlyStreamProcessorContext processingContext) {
actor = processingContext.getActor();
streamWriter = processingContext.getLogStreamWriter();
// check if timers are due after restart
Expand Down

0 comments on commit 2fec923

Please sign in to comment.