Skip to content

Commit

Permalink
merge: #10000
Browse files Browse the repository at this point in the history
10000: Remove proxy writers r=Zelldon a=Zelldon

## Description

After #9967 is merged we can look at this PR.
<!-- Please explain the changes you made here. -->
Removes the proxy writers, which have been introduced for not writing on reprocessing, which we no longer do.


## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #9727



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 5, 2022
2 parents bdc2405 + c8dd038 commit f9b921e
Show file tree
Hide file tree
Showing 8 changed files with 4 additions and 97 deletions.
7 changes: 0 additions & 7 deletions engine/src/main/java/io/camunda/zeebe/engine/Engine.java
Expand Up @@ -16,8 +16,6 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorContextImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
Expand All @@ -41,8 +39,6 @@ public class Engine implements RecordProcessor<EngineContext> {
private EventApplier eventApplier;
private RecordProcessorMap recordProcessorMap;
private ZeebeDbState zeebeState;
private LegacyTypedStreamWriter streamWriter;
private LegacyTypedResponseWriter responseWriter;

private final ErrorRecord errorRecord = new ErrorRecord();

Expand All @@ -55,9 +51,6 @@ public Engine() {}

@Override
public void init(final EngineContext engineContext) {
streamWriter = engineContext.getStreamWriterProxy();
responseWriter = engineContext.getTypedResponseWriter();

zeebeState =
new ZeebeDbState(
engineContext.getPartitionId(),
Expand Down
16 changes: 0 additions & 16 deletions engine/src/main/java/io/camunda/zeebe/engine/EngineContext.java
Expand Up @@ -13,8 +13,6 @@
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import java.util.Collections;
Expand All @@ -27,8 +25,6 @@ public final class EngineContext {
private final ProcessingScheduleService scheduleService;
private final ZeebeDb zeebeDb;
private final TransactionContext transactionContext;
private final LegacyTypedStreamWriter streamWriter;
private final LegacyTypedResponseWriter responseWriter;
private final Function<MutableZeebeState, EventApplier> eventApplierFactory;
private final TypedRecordProcessorFactory typedRecordProcessorFactory;
private List<StreamProcessorLifecycleAware> lifecycleListeners = Collections.EMPTY_LIST;
Expand All @@ -39,16 +35,12 @@ public EngineContext(
final ProcessingScheduleService scheduleService,
final ZeebeDb zeebeDb,
final TransactionContext transactionContext,
final LegacyTypedStreamWriter streamWriter,
final LegacyTypedResponseWriter responseWriter,
final Function<MutableZeebeState, EventApplier> eventApplierFactory,
final TypedRecordProcessorFactory typedRecordProcessorFactory) {
this.partitionId = partitionId;
this.scheduleService = scheduleService;
this.zeebeDb = zeebeDb;
this.transactionContext = transactionContext;
this.streamWriter = streamWriter;
this.responseWriter = responseWriter;
this.eventApplierFactory = eventApplierFactory;
this.typedRecordProcessorFactory = typedRecordProcessorFactory;
}
Expand All @@ -69,14 +61,6 @@ public TransactionContext getTransactionContext() {
return transactionContext;
}

public LegacyTypedStreamWriter getStreamWriterProxy() {
return streamWriter;
}

public LegacyTypedResponseWriter getTypedResponseWriter() {
return responseWriter;
}

public Function<MutableZeebeState, EventApplier> getEventApplierFactory() {
return eventApplierFactory;
}
Expand Down
17 changes: 0 additions & 17 deletions engine/src/main/java/io/camunda/zeebe/engine/api/LegacyTask.java

This file was deleted.

Expand Up @@ -17,12 +17,6 @@ public interface ProcessingScheduleService {

<T> void runOnCompletion(ActorFuture<T> precedingTask, BiConsumer<T, Throwable> followUpTask);

@Deprecated
<T> void runOnCompletion(ActorFuture<T> precedingTask, LegacyTask followUpTask);

@Deprecated
void runDelayed(Duration delay, LegacyTask followUpTask);

default void runAtFixedRate(final Duration delay, final Runnable task) {
runDelayed(
delay,
Expand Down
Expand Up @@ -7,9 +7,7 @@
*/
package io.camunda.zeebe.streamprocessor;

import io.camunda.zeebe.engine.api.LegacyTask;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedCommandWriter;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.time.Duration;
Expand All @@ -18,12 +16,9 @@
public class ProcessingScheduleServiceImpl implements ProcessingScheduleService {

private final ActorControl actorControl;
private final LegacyTypedCommandWriter commandWriter;

public ProcessingScheduleServiceImpl(
final ActorControl actorControl, final LegacyTypedCommandWriter legacyTypedCommandWriter) {
public ProcessingScheduleServiceImpl(final ActorControl actorControl) {
this.actorControl = actorControl;
commandWriter = legacyTypedCommandWriter;
}

@Override
Expand All @@ -37,26 +32,6 @@ public <T> void runOnCompletion(
scheduleOnActor(() -> actorControl.runOnCompletion(precedingTask, followUpTask));
}

@Override
public <T> void runOnCompletion(
final ActorFuture<T> precedingTask, final LegacyTask followUpTask) {
runOnCompletion(
precedingTask,
(BiConsumer<T, Throwable>)
(ok, err) -> {
followUpTask.run(commandWriter, this);
});
}

@Override
public void runDelayed(final Duration delay, final LegacyTask followUpTask) {
runDelayed(
delay,
() -> {
followUpTask.run(commandWriter, this);
});
}

private void scheduleOnActor(final Runnable task) {
actorControl.submit(task);
}
Expand Down
Expand Up @@ -182,8 +182,6 @@ protected void onActorStarted() {

replayStateMachine =
new ReplayStateMachine(engine, streamProcessorContext, this::shouldProcessNext);
// disable writing to the log stream
streamProcessorContext.disableLogStreamWriter();

openFuture.complete(null);
replayCompletedFuture = replayStateMachine.startRecover(snapshotPosition);
Expand Down Expand Up @@ -287,9 +285,6 @@ private void onRetrievingWriter(

phase = Phase.PROCESSING;

// enable writing records to the stream
streamProcessorContext.enableLogStreamWriter();

processingStateMachine =
new ProcessingStateMachine(streamProcessorContext, this::shouldProcessNext, engine);

Expand Down Expand Up @@ -333,8 +328,6 @@ private void initEngine() {
streamProcessorContext.getScheduleService(),
zeebeDb,
streamProcessorContext.getTransactionContext(),
streamProcessorContext.getLogStreamWriter(),
streamProcessorContext.getTypedResponseWriter(),
eventApplierFactory,
typedRecordProcessorFactory);
engine.init(engineContext);
Expand Down
Expand Up @@ -10,13 +10,11 @@
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.processing.bpmn.behavior.LegacyTypedStreamWriterProxy;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorListener;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedResponseWriterImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.NoopLegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.KeyGeneratorControls;
import io.camunda.zeebe.engine.state.ZeebeDbState;
Expand All @@ -32,14 +30,10 @@ public final class StreamProcessorContext implements ReadonlyStreamProcessorCont

private static final StreamProcessorListener NOOP_LISTENER = processedCommand -> {};

private final LegacyTypedStreamWriterProxy streamWriterProxy = new LegacyTypedStreamWriterProxy();
private final NoopLegacyTypedStreamWriter noopTypedStreamWriter =
new NoopLegacyTypedStreamWriter();

private ActorControl actor;
private LogStream logStream;
private LogStreamReader logStreamReader;
private LegacyTypedStreamWriter logStreamWriter = noopTypedStreamWriter;
private LegacyTypedStreamWriter logStreamWriter;
private LegacyTypedResponseWriterImpl typedResponseWriter;

private RecordValues recordValues;
Expand All @@ -59,7 +53,7 @@ public final class StreamProcessorContext implements ReadonlyStreamProcessorCont

public StreamProcessorContext actor(final ActorControl actor) {
this.actor = actor;
processingScheduleService = new ProcessingScheduleServiceImpl(actor, streamWriterProxy);
processingScheduleService = new ProcessingScheduleServiceImpl(actor);
return this;
}

Expand All @@ -75,7 +69,7 @@ public LogStream getLogStream() {

@Override
public LegacyTypedStreamWriter getLogStreamWriter() {
return streamWriterProxy;
return logStreamWriter;
}

@Override
Expand Down Expand Up @@ -188,14 +182,6 @@ public StreamProcessorListener getStreamProcessorListener() {
return streamProcessorListener;
}

public void enableLogStreamWriter() {
streamWriterProxy.wrap(logStreamWriter);
}

public void disableLogStreamWriter() {
streamWriterProxy.wrap(noopTypedStreamWriter);
}

public StreamProcessorMode getProcessorMode() {
return streamProcessorMode;
}
Expand Down
Expand Up @@ -45,7 +45,6 @@ public void setUp() {

final StreamProcessorContext streamProcessorContext =
new StreamProcessorContext().actor(someActor).logStreamWriter(typedStreamWriter);
streamProcessorContext.enableLogStreamWriter();
jobTimeoutTrigger.onRecovered(streamProcessorContext);

IntStream.range(0, 3)
Expand Down

0 comments on commit f9b921e

Please sign in to comment.