Skip to content

Commit

Permalink
merge: #9994
Browse files Browse the repository at this point in the history
9994: Remove writers from TypedRecordProcessor r=Zelldon a=Zelldon

## Description

 * Replaces the usage of the StreamWriter and ResponseWriter from the parameters with the Writers and their "sub-writers"
   * Discussable whether we always want to have Writers in the processor classes or just the specific writers, but I guess we can here improve also later
 * Adjust several test cases that they also are filled with the Writers object
 * Finally, remove the writers from the processRecord interface

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #9727
closes #8002



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 5, 2022
2 parents 00fb0db + dbc7f0c commit bdc2405
Show file tree
Hide file tree
Showing 31 changed files with 249 additions and 403 deletions.
4 changes: 0 additions & 4 deletions engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,8 @@ public ProcessingResult process(

final boolean isNotOnBlacklist = !zeebeState.getBlackListState().isOnBlacklist(typedCommand);
if (isNotOnBlacklist) {
final long position = typedCommand.getPosition();
currentProcessor.processRecord(
position,
record,
responseWriter,
streamWriter,
(sep) -> {
processingResultBuilder.resetPostCommitTasks();
processingResultBuilder.appendPostCommitTask(sep::flush);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.camunda.zeebe.engine.processing.processinstance.ProcessInstanceModificationProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.processing.timer.CancelTimerProcessor;
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker;
Expand Down Expand Up @@ -67,7 +66,8 @@ public static TypedRecordProcessor<ProcessInstanceRecord> addProcessProcessors(

final var processEngineMetrics = new ProcessEngineMetrics(zeebeState.getPartitionId());

addProcessInstanceCommandProcessor(typedRecordProcessors, zeebeState.getElementInstanceState());
addProcessInstanceCommandProcessor(
writers, typedRecordProcessors, zeebeState.getElementInstanceState());

final var bpmnStreamProcessor =
new BpmnStreamProcessor(
Expand Down Expand Up @@ -101,7 +101,7 @@ public static TypedRecordProcessor<ProcessInstanceRecord> addProcessProcessors(
variableBehavior,
zeebeState.getElementInstanceState(),
keyGenerator,
writers.state());
writers);
addProcessInstanceCreationStreamProcessors(
typedRecordProcessors,
zeebeState,
Expand All @@ -116,11 +116,12 @@ public static TypedRecordProcessor<ProcessInstanceRecord> addProcessProcessors(
}

private static void addProcessInstanceCommandProcessor(
final Writers writers,
final TypedRecordProcessors typedRecordProcessors,
final MutableElementInstanceState elementInstanceState) {

final ProcessInstanceCommandProcessor commandProcessor =
new ProcessInstanceCommandProcessor(elementInstanceState);
new ProcessInstanceCommandProcessor(writers, elementInstanceState);

Arrays.stream(ProcessInstanceIntent.values())
.filter(ProcessInstanceIntent::isProcessInstanceCommand)
Expand Down Expand Up @@ -200,12 +201,12 @@ private static void addVariableDocumentStreamProcessors(
final VariableBehavior variableBehavior,
final ElementInstanceState elementInstanceState,
final KeyGenerator keyGenerator,
final StateWriter stateWriter) {
final Writers writers) {
typedRecordProcessors.onCommand(
ValueType.VARIABLE_DOCUMENT,
VariableDocumentIntent.UPDATE,
new UpdateVariableDocumentProcessor(
elementInstanceState, keyGenerator, variableBehavior, stateWriter));
elementInstanceState, keyGenerator, variableBehavior, writers));
}

private static void addProcessInstanceCreationStreamProcessors(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.processing.variable.VariableBehavior;
Expand Down Expand Up @@ -88,8 +86,6 @@ private BpmnElementContainerProcessor<ExecutableFlowElement> getContainerProcess
@Override
public void processRecord(
final TypedRecord<ProcessInstanceRecord> record,
final LegacyTypedResponseWriter responseWriter,
final LegacyTypedStreamWriter streamWriter,
final Consumer<SideEffectProducer> sideEffect) {

// initialize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public BpmnEventSubscriptionBehavior(
*/
public <T extends ExecutableCatchEventSupplier> Either<Failure, Void> subscribeToEvents(
final T element, final BpmnElementContext context) {
return catchEventBehavior.subscribeToEvents(context, element, sideEffects, commandWriter);
return catchEventBehavior.subscribeToEvents(context, element, sideEffects);
}

public void unsubscribeFromEvents(final BpmnElementContext context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ private void unsubscribeFromEvents(
public Either<Failure, Void> subscribeToEvents(
final BpmnElementContext context,
final ExecutableCatchEventSupplier supplier,
final SideEffects sideEffects,
final TypedCommandWriter commandWriter) {
final SideEffects sideEffects) {
final var evaluationResults =
supplier.getEvents().stream()
.filter(event -> event.isTimer() || event.isMessage())
Expand All @@ -156,7 +155,7 @@ public Either<Failure, Void> subscribeToEvents(
evaluationResults.ifRight(
results -> {
subscribeToMessageEvents(context, sideEffects, results);
subscribeToTimerEvents(context, sideEffects, commandWriter, results);
subscribeToTimerEvents(context, sideEffects, results);
});

return evaluationResults.map(r -> null);
Expand Down Expand Up @@ -274,7 +273,6 @@ private void subscribeToMessageEvent(
private void subscribeToTimerEvents(
final BpmnElementContext context,
final SideEffects sideEffects,
final TypedCommandWriter commandWriter,
final List<EvalResult> results) {
results.stream()
.filter(EvalResult::isTimer)
Expand All @@ -288,7 +286,6 @@ private void subscribeToTimerEvents(
context.getProcessDefinitionKey(),
event.getId(),
timer,
commandWriter,
sideEffects);
});
}
Expand All @@ -299,7 +296,6 @@ public void subscribeToTimerEvent(
final long processDefinitionKey,
final DirectBuffer handlerNodeId,
final Timer timer,
final TypedCommandWriter commandWriter,
final SideEffects sideEffects) {
final long dueDate = timer.getDueDate(ActorClock.currentTimeMillis());
timerRecord.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffects;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.ProcessState;
Expand Down Expand Up @@ -58,6 +59,9 @@ public final class DeploymentCreateProcessor implements TypedRecordProcessor<Dep
private final StateWriter stateWriter;
private final MessageStartEventSubscriptionManager messageStartEventSubscriptionManager;
private final DeploymentDistributionBehavior deploymentDistributionBehavior;
private final TypedRejectionWriter rejectionWriter;
private final TypedResponseWriter responseWriter;
private final TypedCommandWriter commandWriter;

public DeploymentCreateProcessor(
final ZeebeState zeebeState,
Expand All @@ -71,6 +75,9 @@ public DeploymentCreateProcessor(
timerInstanceState = zeebeState.getTimerState();
this.keyGenerator = keyGenerator;
stateWriter = writers.state();
rejectionWriter = writers.rejection();
responseWriter = writers.response();
commandWriter = writers.command();
deploymentTransformer =
new DeploymentTransformer(stateWriter, zeebeState, expressionProcessor, keyGenerator);
this.catchEventBehavior = catchEventBehavior;
Expand All @@ -85,10 +92,7 @@ public DeploymentCreateProcessor(

@Override
public void processRecord(
final TypedRecord<DeploymentRecord> command,
final LegacyTypedResponseWriter responseWriter,
final LegacyTypedStreamWriter streamWriter,
final Consumer<SideEffectProducer> sideEffect) {
final TypedRecord<DeploymentRecord> command, final Consumer<SideEffectProducer> sideEffect) {

sideEffect.accept(sideEffects);

Expand All @@ -99,11 +103,11 @@ public void processRecord(
final long key = keyGenerator.nextKey();

try {
createTimerIfTimerStartEvent(command, streamWriter, sideEffects);
createTimerIfTimerStartEvent(command, sideEffects);
} catch (final RuntimeException e) {
final String reason = String.format(COULD_NOT_CREATE_TIMER_MESSAGE, e.getMessage());
responseWriter.writeRejectionOnCommand(command, RejectionType.PROCESSING_ERROR, reason);
streamWriter.appendRejection(command, RejectionType.PROCESSING_ERROR, reason);
rejectionWriter.appendRejection(command, RejectionType.PROCESSING_ERROR, reason);
return;
}

Expand All @@ -120,30 +124,27 @@ public void processRecord(
command,
deploymentTransformer.getRejectionType(),
deploymentTransformer.getRejectionReason());
streamWriter.appendRejection(
rejectionWriter.appendRejection(
command,
deploymentTransformer.getRejectionType(),
deploymentTransformer.getRejectionReason());
}
}

private void createTimerIfTimerStartEvent(
final TypedRecord<DeploymentRecord> record,
final LegacyTypedStreamWriter streamWriter,
final SideEffects sideEffects) {
final TypedRecord<DeploymentRecord> record, final SideEffects sideEffects) {
for (final ProcessMetadata processMetadata : record.getValue().processesMetadata()) {
if (!processMetadata.isDuplicate()) {
final List<ExecutableStartEvent> startEvents =
processState.getProcessByKey(processMetadata.getKey()).getProcess().getStartEvents();

unsubscribeFromPreviousTimers(streamWriter, processMetadata);
subscribeToTimerStartEventIfExists(streamWriter, sideEffects, processMetadata, startEvents);
unsubscribeFromPreviousTimers(processMetadata);
subscribeToTimerStartEventIfExists(sideEffects, processMetadata, startEvents);
}
}
}

private void subscribeToTimerStartEventIfExists(
final LegacyTypedStreamWriter streamWriter,
final SideEffects sideEffects,
final ProcessMetadata processMetadata,
final List<ExecutableStartEvent> startEvents) {
Expand All @@ -165,28 +166,23 @@ private void subscribeToTimerStartEventIfExists(
processMetadata.getKey(),
startEvent.getId(),
timerOrError.get(),
streamWriter,
sideEffects);
}
}
}

private void unsubscribeFromPreviousTimers(
final LegacyTypedStreamWriter streamWriter, final ProcessMetadata processRecord) {
private void unsubscribeFromPreviousTimers(final ProcessMetadata processRecord) {
timerInstanceState.forEachTimerForElementInstance(
NO_ELEMENT_INSTANCE,
timer -> unsubscribeFromPreviousTimer(streamWriter, processRecord, timer));
NO_ELEMENT_INSTANCE, timer -> unsubscribeFromPreviousTimer(processRecord, timer));
}

private void unsubscribeFromPreviousTimer(
final LegacyTypedStreamWriter streamWriter,
final ProcessMetadata processMetadata,
final TimerInstance timer) {
final ProcessMetadata processMetadata, final TimerInstance timer) {
final DirectBuffer timerBpmnId =
processState.getProcessByKey(timer.getProcessDefinitionKey()).getBpmnProcessId();

if (timerBpmnId.equals(processMetadata.getBpmnProcessIdBuffer())) {
catchEventBehavior.unsubscribeFromTimerEvent(timer, streamWriter);
catchEventBehavior.unsubscribeFromTimerEvent(timer, commandWriter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.NoopResponseWriterLegacy;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.ElementInstanceState;
Expand Down Expand Up @@ -48,6 +48,7 @@ public final class ResolveIncidentProcessor implements TypedRecordProcessor<Inci
private final IncidentState incidentState;
private final ElementInstanceState elementInstanceState;
private final KeyGenerator keyGenerator;
private final TypedResponseWriter responseWriter;

public ResolveIncidentProcessor(
final ZeebeState zeebeState,
Expand All @@ -57,36 +58,33 @@ public ResolveIncidentProcessor(
this.bpmnStreamProcessor = bpmnStreamProcessor;
stateWriter = writers.state();
rejectionWriter = writers.rejection();
responseWriter = writers.response();
incidentState = zeebeState.getIncidentState();
elementInstanceState = zeebeState.getElementInstanceState();
this.keyGenerator = keyGenerator;
}

@Override
public void processRecord(
final TypedRecord<IncidentRecord> command,
final LegacyTypedResponseWriter responseWriter,
final LegacyTypedStreamWriter streamWriter,
final Consumer<SideEffectProducer> sideEffect) {
final TypedRecord<IncidentRecord> command, final Consumer<SideEffectProducer> sideEffect) {
final long key = command.getKey();

final var incident = incidentState.getIncidentRecord(key);
if (incident == null) {
final var errorMessage = String.format(NO_INCIDENT_FOUND_MSG, key);
rejectResolveCommand(command, responseWriter, errorMessage, RejectionType.NOT_FOUND);
rejectResolveCommand(command, errorMessage, RejectionType.NOT_FOUND);
return;
}

stateWriter.appendFollowUpEvent(key, IncidentIntent.RESOLVED, incident);
responseWriter.writeEventOnCommand(key, IncidentIntent.RESOLVED, incident, command);

// if it fails, a new incident is raised
attemptToContinueProcessProcessing(command, streamWriter, sideEffect, incident);
attemptToContinueProcessProcessing(command, sideEffect, incident);
}

private void rejectResolveCommand(
final TypedRecord<IncidentRecord> command,
final LegacyTypedResponseWriter responseWriter,
final String errorMessage,
final RejectionType rejectionType) {

Expand All @@ -96,7 +94,6 @@ private void rejectResolveCommand(

private void attemptToContinueProcessProcessing(
final TypedRecord<IncidentRecord> command,
final LegacyTypedStreamWriter streamWriter,
final Consumer<SideEffectProducer> sideEffect,
final IncidentRecord incident) {
final long jobKey = incident.getJobKey();
Expand All @@ -111,8 +108,7 @@ private void attemptToContinueProcessProcessing(
failedCommand -> {
sideEffects.clear();

bpmnStreamProcessor.processRecord(
failedCommand, noopResponseWriter, streamWriter, sideEffects::add);
bpmnStreamProcessor.processRecord(failedCommand, sideEffects::add);

sideEffect.accept(sideEffects);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
import io.camunda.zeebe.engine.metrics.JobMetrics;
import io.camunda.zeebe.engine.processing.job.JobBatchCollector.TooLargeJob;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
Expand Down Expand Up @@ -59,10 +57,7 @@ public JobBatchActivateProcessor(
}

@Override
public void processRecord(
final TypedRecord<JobBatchRecord> record,
final LegacyTypedResponseWriter responseWriter,
final LegacyTypedStreamWriter streamWriter) {
public void processRecord(final TypedRecord<JobBatchRecord> record) {
final JobBatchRecord value = record.getValue();
if (isValid(value)) {
activateJobs(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
import io.camunda.zeebe.protocol.record.intent.MessageIntent;
Expand All @@ -24,10 +22,7 @@ public MessageExpireProcessor(final StateWriter stateWriter) {
}

@Override
public void processRecord(
final TypedRecord<MessageRecord> record,
final LegacyTypedResponseWriter responseWriter,
final LegacyTypedStreamWriter streamWriter) {
public void processRecord(final TypedRecord<MessageRecord> record) {

stateWriter.appendFollowUpEvent(record.getKey(), MessageIntent.EXPIRED, record.getValue());
}
Expand Down

0 comments on commit bdc2405

Please sign in to comment.