Skip to content

Commit

Permalink
merge: #9743
Browse files Browse the repository at this point in the history
9743: Refactor and Move `ProcessorContext` r=pihme a=pihme

## Description

* Rename `(ReadOnly)ProcessorContext` to  `StreamProcessorContext`
* Move interface to `api` package, move implementation to `streamprocessor` package
* Simplifiy code

## Related issues

related to #9725 



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Jul 8, 2022
2 parents 180351a + cbcf66e commit 4d3459a
Show file tree
Hide file tree
Showing 34 changed files with 222 additions and 234 deletions.
Expand Up @@ -48,11 +48,9 @@
import io.camunda.zeebe.broker.system.partitions.impl.steps.ZeebeDbPartitionTransitionStep;
import io.camunda.zeebe.broker.transport.commandapi.CommandApiService;
import io.camunda.zeebe.db.impl.rocksdb.ZeebeRocksDbFactory;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.EngineProcessors;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.ConcurrencyControl;
Expand Down Expand Up @@ -209,10 +207,8 @@ private TypedRecordProcessorsFactory createFactory(
final ClusterEventService eventService,
final PushDeploymentRequestHandler deploymentRequestHandler,
final FeatureFlags featureFlags) {
return (ProcessingContext processingContext) -> {
final var actor = processingContext.getActor();

final LogStream stream = processingContext.getLogStream();
return (recordProcessorContext) -> {
final var actor = recordProcessorContext.getActor();

final TopologyPartitionListenerImpl partitionListener =
new TopologyPartitionListenerImpl(actor);
Expand All @@ -225,14 +221,15 @@ private TypedRecordProcessorsFactory createFactory(
final PartitionCommandSenderImpl partitionCommandSender =
new PartitionCommandSenderImpl(communicationService, partitionListener);
final SubscriptionCommandSender subscriptionCommandSender =
new SubscriptionCommandSender(stream.getPartitionId(), partitionCommandSender);
new SubscriptionCommandSender(
recordProcessorContext.getPartitionId(), partitionCommandSender);

final LongPollingJobNotification jobsAvailableNotification =
new LongPollingJobNotification(eventService);

final var processor =
EngineProcessors.createEngineProcessors(
processingContext,
recordProcessorContext,
localBroker.getPartitionsCount(),
subscriptionCommandSender,
deploymentDistributor,
Expand Down
Expand Up @@ -7,11 +7,11 @@
*/
package io.camunda.zeebe.broker.system.partitions;

import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;

@FunctionalInterface
public interface TypedRecordProcessorsFactory {

TypedRecordProcessors createTypedStreamProcessor(ProcessingContext processingContext);
TypedRecordProcessors createTypedStreamProcessor(RecordProcessorContext recordProcessorContext);
}
Expand Up @@ -5,19 +5,15 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.scheduler.ActorControl;
import java.util.function.BooleanSupplier;

public interface ReadonlyProcessingContext {
public interface ReadonlyStreamProcessorContext {

/**
* @return the actor on which the processing runs
Expand All @@ -29,17 +25,6 @@ public interface ReadonlyProcessingContext {
*/
LogStream getLogStream();

/**
* @return the reader, which is used by the processor to read next events
*/
LogStreamReader getLogStreamReader();

/**
* @return the maximum fragment size we can write and read this contains the record metadata and
* record value etc.
*/
int getMaxFragmentSize();

/**
* @return the actual log stream writer, used to write any record
*/
Expand All @@ -50,33 +35,15 @@ public interface ReadonlyProcessingContext {
*/
Writers getWriters();

/**
* @return the pool, which contains the mapping from ValueType to UnpackedObject (record)
*/
RecordValues getRecordValues();

/**
* @return the map of processors, which are executed during processing
*/
RecordProcessorMap getRecordProcessorMap();

/**
* @return the state, where the data is stored during processing
*/
MutableZeebeState getZeebeState();

/**
* @return the transaction context for the current actor
*/
TransactionContext getTransactionContext();

/**
* @return condition which indicates, whether the processing should stop or not
*/
BooleanSupplier getAbortCondition();

/**
* @return the consumer of events to apply their state changes
* Returns the partition ID
*
* @return partition ID
*/
EventApplier getEventApplier();
int getPartitionId();
}
@@ -0,0 +1,29 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.LastProcessedPositionState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.scheduler.ActorControl;

public interface RecordProcessorContext {

int getPartitionId();

ActorControl getActor();

MutableZeebeState getZeebeState();

Writers getWriters();

LastProcessedPositionState getLastProcessedPositionState();

RecordProcessorContext listener(StreamProcessorListener streamProcessorListener);
}
Expand Up @@ -5,12 +5,12 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;
package io.camunda.zeebe.engine.api;

public interface StreamProcessorLifecycleAware {

/** Callback after reprocessing was successful and before regular processing begins */
default void onRecovered(final ReadonlyProcessingContext context) {}
default void onRecovered(final ReadonlyStreamProcessorContext context) {}

/** Callback which is called when StreamProcessor is on closing phase. */
default void onClose() {}
Expand Down
Expand Up @@ -10,6 +10,7 @@
import static io.camunda.zeebe.protocol.record.intent.DeploymentIntent.CREATE;

import io.camunda.zeebe.el.ExpressionLanguageFactory;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.metrics.JobMetrics;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnEventPublicationBehavior;
import io.camunda.zeebe.engine.processing.common.CatchEventBehavior;
Expand All @@ -25,7 +26,6 @@
import io.camunda.zeebe.engine.processing.job.JobEventProcessors;
import io.camunda.zeebe.engine.processing.message.MessageEventProcessors;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
Expand All @@ -35,7 +35,6 @@
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
import io.camunda.zeebe.engine.state.migration.DbMigrationController;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
Expand All @@ -47,27 +46,26 @@
public final class EngineProcessors {

public static TypedRecordProcessors createEngineProcessors(
final ProcessingContext processingContext,
final RecordProcessorContext recordProcessorContext,
final int partitionsCount,
final SubscriptionCommandSender subscriptionCommandSender,
final DeploymentDistributor deploymentDistributor,
final DeploymentResponder deploymentResponder,
final Consumer<String> onJobsAvailableCallback,
final FeatureFlags featureFlags) {

final var actor = processingContext.getActor();
final MutableZeebeState zeebeState = processingContext.getZeebeState();
final var writers = processingContext.getWriters();
final var actor = recordProcessorContext.getActor();
final MutableZeebeState zeebeState = recordProcessorContext.getZeebeState();
final var writers = recordProcessorContext.getWriters();
final TypedRecordProcessors typedRecordProcessors =
TypedRecordProcessors.processors(zeebeState.getKeyGenerator(), writers);

// register listener that handles migrations immediately, so it is the first to be called
typedRecordProcessors.withListener(new DbMigrationController());

typedRecordProcessors.withListener(processingContext.getZeebeState());
typedRecordProcessors.withListener(zeebeState);

final LogStream stream = processingContext.getLogStream();
final int partitionId = stream.getPartitionId();
final int partitionId = recordProcessorContext.getPartitionId();

final var variablesState = zeebeState.getVariableState();
final var expressionProcessor =
Expand Down
Expand Up @@ -7,8 +7,8 @@
*/
package io.camunda.zeebe.engine.processing.deployment.distribute;

import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.state.immutable.DeploymentState;

public class DeploymentRedistributor implements StreamProcessorLifecycleAware {
Expand All @@ -27,7 +27,7 @@ public DeploymentRedistributor(
}

@Override
public void onRecovered(final ReadonlyProcessingContext context) {
public void onRecovered(final ReadonlyStreamProcessorContext context) {
final var actor = context.getActor();
final var writers = context.getWriters();

Expand Down
Expand Up @@ -7,9 +7,9 @@
*/
package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.scheduled.DueDateChecker;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.state.immutable.JobState;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.scheduler.clock.ActorClock;
Expand Down Expand Up @@ -42,7 +42,7 @@ public void scheduleBackOff(final long dueDate) {
}

@Override
public void onRecovered(final ReadonlyProcessingContext context) {
public void onRecovered(final ReadonlyStreamProcessorContext context) {
backOffDueDateChecker.onRecovered(context);
}

Expand Down
Expand Up @@ -7,12 +7,12 @@
*/
package io.camunda.zeebe.engine.processing.job;

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.metrics.JobMetrics;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnEventPublicationBehavior;
import io.camunda.zeebe.engine.processing.common.EventHandle;
import io.camunda.zeebe.engine.processing.common.EventTriggerBehavior;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
Expand Down Expand Up @@ -75,7 +75,7 @@ ValueType.JOB, JobIntent.UPDATE_RETRIES, new JobUpdateRetriesProcessor(zeebeStat
.withListener(
new StreamProcessorLifecycleAware() {
@Override
public void onRecovered(final ReadonlyProcessingContext context) {
public void onRecovered(final ReadonlyStreamProcessorContext context) {
jobState.setJobsAvailableCallback(onJobsAvailableCallback);
}
});
Expand Down
Expand Up @@ -9,8 +9,8 @@

import static io.camunda.zeebe.scheduler.clock.ActorClock.currentTimeMillis;

import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.state.immutable.JobState;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
Expand All @@ -23,14 +23,14 @@ public final class JobTimeoutTrigger implements StreamProcessorLifecycleAware {

private ScheduledTimer timer;
private TypedCommandWriter writer;
private ReadonlyProcessingContext processingContext;
private ReadonlyStreamProcessorContext processingContext;

public JobTimeoutTrigger(final JobState state) {
this.state = state;
}

@Override
public void onRecovered(final ReadonlyProcessingContext processingContext) {
public void onRecovered(final ReadonlyStreamProcessorContext processingContext) {
this.processingContext = processingContext;
timer =
this.processingContext
Expand Down
Expand Up @@ -7,9 +7,9 @@
*/
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.mutable.MutablePendingMessageSubscriptionState;
import io.camunda.zeebe.scheduler.ActorControl;
Expand All @@ -36,7 +36,7 @@ public MessageObserver(
}

@Override
public void onRecovered(final ReadonlyProcessingContext context) {
public void onRecovered(final ReadonlyStreamProcessorContext context) {
final ActorControl actor = context.getActor();
// it is safe to reuse the write because we running in the same actor/thread
final MessageTimeToLiveChecker timeToLiveChecker =
Expand Down
Expand Up @@ -7,9 +7,9 @@
*/
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.message.command.SubscriptionCommandSender;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.state.message.ProcessMessageSubscription;
import io.camunda.zeebe.engine.state.mutable.MutablePendingProcessMessageSubscriptionState;
import io.camunda.zeebe.scheduler.ActorControl;
Expand Down Expand Up @@ -39,7 +39,7 @@ public PendingProcessMessageSubscriptionChecker(
}

@Override
public void onRecovered(final ReadonlyProcessingContext context) {
public void onRecovered(final ReadonlyStreamProcessorContext context) {
actor = context.getActor();
scheduleTimer();
}
Expand Down
Expand Up @@ -7,8 +7,8 @@
*/
package io.camunda.zeebe.engine.processing.scheduled;

import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.scheduler.ActorControl;
Expand Down Expand Up @@ -85,7 +85,7 @@ private Duration calculateDelayForNextRun(final long dueDate) {
}

@Override
public void onRecovered(final ReadonlyProcessingContext processingContext) {
public void onRecovered(final ReadonlyStreamProcessorContext processingContext) {
actor = processingContext.getActor();
streamWriter = processingContext.getLogStreamWriter();
// check if timers are due after restart
Expand Down

0 comments on commit 4d3459a

Please sign in to comment.