Skip to content

Commit

Permalink
merge: #10021
Browse files Browse the repository at this point in the history
10021: Engine abstraction code cleanup r=pihme a=pihme

## Description
* Removes unnecessary references
* Moves `LegacyTypedResponseWriter` to stream processor, as it is only used in this package
* Renames `LegacyTypedResponseWriter`

## Related issues

related to #9727 



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Aug 8, 2022
2 parents 155de10 + 349bb3b commit bad0d04
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,7 @@ public static TypedRecordProcessors createEngineProcessors(
jobMetrics,
eventTriggerBehavior);

addIncidentProcessors(
zeebeState,
bpmnStreamProcessor,
typedRecordProcessors,
writers,
zeebeState.getKeyGenerator());
addIncidentProcessors(zeebeState, bpmnStreamProcessor, typedRecordProcessors, writers);

return typedRecordProcessors;
}
Expand Down Expand Up @@ -213,10 +208,9 @@ private static void addIncidentProcessors(
final ZeebeState zeebeState,
final TypedRecordProcessor<ProcessInstanceRecord> bpmnStreamProcessor,
final TypedRecordProcessors typedRecordProcessors,
final Writers writers,
final KeyGenerator keyGenerator) {
final Writers writers) {
IncidentEventProcessors.addProcessors(
typedRecordProcessors, zeebeState, bpmnStreamProcessor, writers, keyGenerator);
typedRecordProcessors, zeebeState, bpmnStreamProcessor, writers);
}

private static void addMessageProcessors(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import io.camunda.zeebe.protocol.record.ValueType;
Expand All @@ -22,11 +21,10 @@ public static void addProcessors(
final TypedRecordProcessors typedRecordProcessors,
final ZeebeState zeebeState,
final TypedRecordProcessor<ProcessInstanceRecord> bpmnStreamProcessor,
final Writers writers,
final KeyGenerator keyGenerator) {
final Writers writers) {
typedRecordProcessors.onCommand(
ValueType.INCIDENT,
IncidentIntent.RESOLVE,
new ResolveIncidentProcessor(zeebeState, bpmnStreamProcessor, writers, keyGenerator));
new ResolveIncidentProcessor(zeebeState, bpmnStreamProcessor, writers));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.NoopResponseWriterLegacy;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.ElementInstanceState;
import io.camunda.zeebe.engine.state.immutable.IncidentState;
import io.camunda.zeebe.engine.state.immutable.ZeebeState;
Expand All @@ -39,29 +36,25 @@ public final class ResolveIncidentProcessor implements TypedRecordProcessor<Inci

private final ProcessInstanceRecord failedRecord = new ProcessInstanceRecord();
private final SideEffectQueue sideEffects = new SideEffectQueue();
private final LegacyTypedResponseWriter noopResponseWriter = new NoopResponseWriterLegacy();

private final TypedRecordProcessor<ProcessInstanceRecord> bpmnStreamProcessor;
private final StateWriter stateWriter;
private final TypedRejectionWriter rejectionWriter;

private final IncidentState incidentState;
private final ElementInstanceState elementInstanceState;
private final KeyGenerator keyGenerator;
private final TypedResponseWriter responseWriter;

public ResolveIncidentProcessor(
final ZeebeState zeebeState,
final TypedRecordProcessor<ProcessInstanceRecord> bpmnStreamProcessor,
final Writers writers,
final KeyGenerator keyGenerator) {
final Writers writers) {
this.bpmnStreamProcessor = bpmnStreamProcessor;
stateWriter = writers.state();
rejectionWriter = writers.rejection();
responseWriter = writers.response();
incidentState = zeebeState.getIncidentState();
elementInstanceState = zeebeState.getElementInstanceState();
this.keyGenerator = keyGenerator;
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import java.util.ArrayList;
Expand All @@ -27,7 +26,7 @@ final class DirectProcessingResult implements ProcessingResult {
private final List<PostCommitTask> postCommitTasks;

private final LegacyTypedStreamWriter streamWriter;
private final LegacyTypedResponseWriter responseWriter;
private final DirectTypedResponseWriter responseWriter;
private boolean hasResponse;

DirectProcessingResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -33,7 +32,7 @@ final class DirectProcessingResultBuilder implements ProcessingResultBuilder {

private final StreamProcessorContext context;
private final LegacyTypedStreamWriter streamWriter;
private final LegacyTypedResponseWriter responseWriter;
private final DirectTypedResponseWriter responseWriter;

private boolean hasResponse =
true; // TODO figure out why this still needs to be true for tests to pass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor.writers;
package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedResponseWriter;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;

public interface LegacyTypedResponseWriter extends SideEffectProducer, TypedResponseWriter {
public interface DirectTypedResponseWriter extends SideEffectProducer, TypedResponseWriter {

@Override
void writeRejectionOnCommand(TypedRecord<?> command, RejectionType type, String reason);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor.writers;
package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand All @@ -18,8 +19,8 @@
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;

public final class LegacyTypedResponseWriterImpl
implements LegacyTypedResponseWriter, SideEffectProducer {
public final class DirectTypedResponseWriterImpl
implements DirectTypedResponseWriter, SideEffectProducer {

private final CommandResponseWriter writer;
private final int partitionId;
Expand All @@ -28,7 +29,7 @@ public final class LegacyTypedResponseWriterImpl
private int requestStreamId;
private boolean isResponseStaged;

public LegacyTypedResponseWriterImpl(final CommandResponseWriter writer, final int partitionId) {
public DirectTypedResponseWriterImpl(final CommandResponseWriter writer, final int partitionId) {
this.writer = writer;
this.partitionId = partitionId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriterImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.engine.state.ZeebeDbState;
Expand All @@ -33,7 +32,7 @@ public final class StreamProcessorContext implements ReadonlyStreamProcessorCont
private LogStream logStream;
private LogStreamReader logStreamReader;
private LegacyTypedStreamWriter logStreamWriter;
private LegacyTypedResponseWriterImpl typedResponseWriter;
private DirectTypedResponseWriterImpl typedResponseWriter;

private RecordValues recordValues;
private ZeebeDbState zeebeState;
Expand Down Expand Up @@ -134,11 +133,11 @@ public StreamProcessorContext commandResponseWriter(
final CommandResponseWriter commandResponseWriter) {
this.commandResponseWriter = commandResponseWriter;
typedResponseWriter =
new LegacyTypedResponseWriterImpl(commandResponseWriter, getLogStream().getPartitionId());
new DirectTypedResponseWriterImpl(commandResponseWriter, getLogStream().getPartitionId());
return this;
}

public LegacyTypedResponseWriterImpl getTypedResponseWriter() {
public DirectTypedResponseWriterImpl getTypedResponseWriter() {
return typedResponseWriter;
}

Expand Down

0 comments on commit bad0d04

Please sign in to comment.