Skip to content

Commit

Permalink
merge: #10057
Browse files Browse the repository at this point in the history
10057: refactor(engine): merge two legacy interfaces together r=pihme a=pihme

## Description

* `LegacyTypedStreamWriter` already extended `LegacyTypedCommandWriter`
* This PR replaces all references to `LegacyTypedCommandWriter` with `LegacyTypedStreamWriter` 
* It then merges the two interfaces and deleted `LegacyTypedCommandWriter`
* The net effect of this, is that we only need to replace one interface going forward

## Related issues

relates to #9724



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Aug 15, 2022
2 parents a3d03d8 + c4abc4e commit 1f09e85
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.immutable.JobState;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import java.time.Duration;
Expand All @@ -22,7 +22,7 @@ public final class JobTimeoutTrigger implements StreamProcessorLifecycleAware {

private boolean shouldReschedule = false;

private LegacyTypedCommandWriter writer;
private LegacyTypedStreamWriter writer;
private ReadonlyStreamProcessorContext processingContext;

public JobTimeoutTrigger(final JobState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package io.camunda.zeebe.engine.processing.message;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.immutable.MessageState;
import io.camunda.zeebe.engine.state.message.StoredMessage;
import io.camunda.zeebe.protocol.impl.record.value.message.MessageRecord;
Expand All @@ -16,13 +16,13 @@

public final class MessageTimeToLiveChecker implements Runnable {

private final LegacyTypedCommandWriter writer;
private final LegacyTypedStreamWriter writer;
private final MessageState messageState;

private final MessageRecord deleteMessageCommand = new MessageRecord();

public MessageTimeToLiveChecker(
final LegacyTypedCommandWriter writer, final MessageState messageState) {
final LegacyTypedStreamWriter writer, final MessageState messageState) {
this.writer = writer;
this.messageState = messageState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.scheduler.clock.ActorClock;
import java.time.Duration;
Expand All @@ -26,11 +25,11 @@ public final class DueDateChecker implements StreamProcessorLifecycleAware {

private long nextDueDate = -1L;
private final long timerResolution;
private final Function<LegacyTypedCommandWriter, Long> nextDueDateSupplier;
private final Function<LegacyTypedStreamWriter, Long> nextDueDateSupplier;

public DueDateChecker(
final long timerResolution,
final Function<LegacyTypedCommandWriter, Long> nextDueDateFunction) {
final Function<LegacyTypedStreamWriter, Long> nextDueDateFunction) {
this.timerResolution = timerResolution;
nextDueDateSupplier = nextDueDateFunction;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
import io.camunda.zeebe.protocol.record.intent.Intent;

/** Things that only a stream processor should write to the log stream (+ commands) */
public interface LegacyTypedStreamWriter
extends LegacyTypedCommandWriter, TypedEventWriter, TypedRejectionWriter {
public interface LegacyTypedStreamWriter extends TypedEventWriter {

void appendFollowUpCommand(long key, Intent intent, RecordValue value);

void appendRecord(
long key,
Expand All @@ -25,4 +26,8 @@ void appendRecord(
RecordValue value);

void configureSourceContext(long sourceRecordPosition);

void reset();

long flush();
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import static io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry.EVENT_REGISTRY;

import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter.LogEntryBuilder;
import io.camunda.zeebe.msgpack.UnpackedObject;
Expand Down Expand Up @@ -53,6 +52,11 @@ protected void appendRecord(
appendRecord(key, type, intent, RejectionType.NULL_VAL, "", value);
}

@Override
public void appendFollowUpCommand(final long key, final Intent intent, final RecordValue value) {
appendRecord(key, RecordType.COMMAND, intent, value);
}

@Override
public void appendRecord(
final long key,
Expand Down Expand Up @@ -90,16 +94,6 @@ public void configureSourceContext(final long sourceRecordPosition) {
this.sourceRecordPosition = sourceRecordPosition;
}

@Override
public void appendNewCommand(final Intent intent, final RecordValue value) {
appendRecord(-1, RecordType.COMMAND, intent, value);
}

@Override
public void appendFollowUpCommand(final long key, final Intent intent, final RecordValue value) {
appendRecord(key, RecordType.COMMAND, intent, value);
}

@Override
public void reset() {
sourceRecordPosition = -1;
Expand All @@ -112,20 +106,6 @@ public long flush() {
return batchWriter.tryWrite();
}

@Override
public void appendRejection(
final TypedRecord<? extends RecordValue> command,
final RejectionType rejectionType,
final String reason) {
appendRecord(
command.getKey(),
RecordType.COMMAND_REJECTION,
command.getIntent(),
rejectionType,
reason,
command.getValue());
}

@Override
public void appendFollowUpEvent(final long key, final Intent intent, final RecordValue value) {
appendRecord(key, RecordType.EVENT, intent, value);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.camunda.zeebe.engine.api.ReadonlyStreamProcessorContext;
import io.camunda.zeebe.engine.api.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.scheduled.DueDateChecker;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.state.immutable.TimerInstanceState;
import io.camunda.zeebe.engine.state.immutable.TimerInstanceState.TimerVisitor;
import io.camunda.zeebe.engine.state.instance.TimerInstance;
Expand Down Expand Up @@ -66,7 +66,7 @@ public void onResumed() {
}

protected static final class TriggerTimersSideEffect
implements Function<LegacyTypedCommandWriter, Long> {
implements Function<LegacyTypedStreamWriter, Long> {

private final ActorClock actorClock;

Expand All @@ -83,7 +83,7 @@ public TriggerTimersSideEffect(
}

@Override
public Long apply(final LegacyTypedCommandWriter legacyTypedCommandWriter) {
public Long apply(final LegacyTypedStreamWriter legacyTypedCommandWriter) {
final var now = actorClock.getTimeMillis();

final var yieldAfter = now + Math.round(TIMER_RESOLUTION * GIVE_YIELD_FACTOR);
Expand All @@ -107,10 +107,9 @@ protected static final class WriteTriggerTimerCommandVisitor implements TimerVis

private final TimerRecord timerRecord = new TimerRecord();

private final LegacyTypedCommandWriter legacyTypedCommandWriter;
private final LegacyTypedStreamWriter legacyTypedCommandWriter;

public WriteTriggerTimerCommandVisitor(
final LegacyTypedCommandWriter legacyTypedCommandWriter) {
public WriteTriggerTimerCommandVisitor(final LegacyTypedStreamWriter legacyTypedCommandWriter) {
this.legacyTypedCommandWriter = legacyTypedCommandWriter;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,16 @@ public boolean hasHealthStatus(final HealthStatus healthStatus) {
private final class WrappedStreamWriterLegacy implements LegacyTypedStreamWriter {

@Override
public void appendRejection(
final TypedRecord<? extends RecordValue> command,
final RejectionType type,
final String reason) {}
public void appendFollowUpEvent(final long key, final Intent intent, final RecordValue value) {}

@Override
public int getMaxEventLength() {
return Integer.MAX_VALUE;
}

@Override
public void appendFollowUpCommand(
final long key, final Intent intent, final RecordValue value) {}

@Override
public void appendRecord(
Expand All @@ -216,21 +222,6 @@ public void appendRecord(
@Override
public void configureSourceContext(final long sourceRecordPosition) {}

@Override
public void appendFollowUpEvent(final long key, final Intent intent, final RecordValue value) {}

@Override
public int getMaxEventLength() {
return Integer.MAX_VALUE;
}

@Override
public void appendNewCommand(final Intent intent, final RecordValue value) {}

@Override
public void appendFollowUpCommand(
final long key, final Intent intent, final RecordValue value) {}

@Override
public void reset() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.LegacyTypedStreamWriter;
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker.TriggerTimersSideEffect;
import io.camunda.zeebe.engine.processing.timer.DueDateTimerChecker.YieldingDecorator;
import io.camunda.zeebe.engine.state.immutable.TimerInstanceState;
Expand Down Expand Up @@ -45,7 +45,7 @@ void shouldAbortIterationAndGiveYieldAfterSomeTimeHasPassed() {
*/

// given
final var mockTypedCommandWriter = mock(LegacyTypedCommandWriter.class);
final var mockTypedCommandWriter = mock(LegacyTypedStreamWriter.class);
when(mockTypedCommandWriter.flush()).thenReturn(1L);

final var mockTimer = mock(TimerInstance.class, Mockito.RETURNS_DEEP_STUBS);
Expand Down

0 comments on commit 1f09e85

Please sign in to comment.