Skip to content

Commit

Permalink
merge: #9736
Browse files Browse the repository at this point in the history
9736: refactor(engine): Remove StreamProcessorLifecycleAware from TypedRecordProcessor r=pihme a=pihme

## Description

- Removes the `StreamProcessorLifecycleAware` interface from `TypedRecordProcessor`. Turns out, none of the record processors implemented these methods
- This will make the engine abstraction easier, because then the new engine does not need to be listener and relay for those events
- Tests were simplified accordingly. I think this deserves most attention in the review. Maybe some tests are now obsolete altogether 🤔 

## Related issues

related to #9725



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Jul 8, 2022
2 parents 0fcf857 + a8071c1 commit c578bc9
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 103 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
// todo (#8002): remove TypedStreamWriter from this interface's method signatures
// After the migration, none of these should be in use anymore and replaced by the CommandWriter and
// StateWriter passed along to the constructors of the concrete processors.
public interface TypedRecordProcessor<T extends UnifiedRecordValue>
extends StreamProcessorLifecycleAware {
public interface TypedRecordProcessor<T extends UnifiedRecordValue> {

/**
* @see #processRecord(TypedRecord, TypedResponseWriter, TypedStreamWriter, Consumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,6 @@ private void initProcessors() {

lifecycleAwareListeners.addAll(typedRecordProcessors.getLifecycleListeners());
final RecordProcessorMap recordProcessorMap = typedRecordProcessors.getRecordProcessorMap();
recordProcessorMap.values().forEachRemaining(lifecycleAwareListeners::add);

processingContext.recordProcessorMap(recordProcessorMap);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.util.Records;
Expand Down Expand Up @@ -81,7 +81,6 @@ public void shouldReplayUntilEnd() {
// then
final InOrder inOrder = inOrder(typedRecordProcessor, eventApplier);
inOrder.verify(eventApplier, TIMEOUT).applyState(anyLong(), eq(ELEMENT_ACTIVATING), any());
inOrder.verify(typedRecordProcessor, TIMEOUT.times(1)).onRecovered(any());
inOrder
.verify(typedRecordProcessor, TIMEOUT)
.processRecord(anyLong(), any(), any(), any(), any());
Expand All @@ -103,12 +102,9 @@ public void shouldReplayContinuously() {
event().processInstance(ELEMENT_ACTIVATING, RECORD).causedBy(0));

// then
final InOrder inOrder = inOrder(typedRecordProcessor, eventApplier);
inOrder
.verify(eventApplier, TIMEOUT.times(2))
.applyState(anyLong(), eq(ELEMENT_ACTIVATING), any());
inOrder.verify(typedRecordProcessor, never()).onRecovered(any());
inOrder.verifyNoMoreInteractions();
verify(eventApplier, TIMEOUT.times(2)).applyState(anyLong(), eq(ELEMENT_ACTIVATING), any());

verifyNoMoreInteractions(eventApplier);

assertThat(getCurrentPhase(replayContinuously)).isEqualTo(Phase.REPLAY);
}
Expand Down Expand Up @@ -203,12 +199,9 @@ public void shouldReplayAfterResumed() {
replayContinuously.resumeProcessing(1);

// then
final var inOrder = inOrder(typedRecordProcessor, eventApplier);
inOrder
.verify(eventApplier, TIMEOUT.times(1))
.applyState(anyLong(), eq(ELEMENT_ACTIVATING), any());
inOrder.verify(typedRecordProcessor, never()).onRecovered(any());
inOrder.verifyNoMoreInteractions();
verify(eventApplier, TIMEOUT.times(1)).applyState(anyLong(), eq(ELEMENT_ACTIVATING), any());

verifyNoMoreInteractions(eventApplier);

assertThat(getCurrentPhase(replayContinuously)).isEqualTo(Phase.REPLAY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.util.Records;
Expand All @@ -42,8 +44,6 @@ public final class StreamProcessorReplayTest {
private static final long TIMEOUT_MILLIS = 2_000L;
private static final VerificationWithTimeout TIMEOUT = timeout(TIMEOUT_MILLIS);

private static final int EXPECTED_ON_RECOVERED_INVOCATIONS = 1;

private static final ProcessInstanceRecord RECORD = Records.processInstance(1);

@Rule public final StreamProcessorRule streamProcessorRule = new StreamProcessorRule();
Expand All @@ -64,15 +64,9 @@ public void shouldReplayEvents() {
startStreamProcessor(typedRecordProcessor, eventApplier);

// then
final InOrder inOrder = inOrder(typedRecordProcessor, eventApplier);
inOrder.verify(eventApplier, TIMEOUT).applyState(anyLong(), eq(ELEMENT_ACTIVATING), any());
inOrder
.verify(typedRecordProcessor, never())
.processRecord(anyLong(), any(), any(), any(), any());
inOrder
.verify(typedRecordProcessor, TIMEOUT.times(EXPECTED_ON_RECOVERED_INVOCATIONS))
.onRecovered(any());
inOrder.verifyNoMoreInteractions();
verify(eventApplier, TIMEOUT).applyState(anyLong(), eq(ELEMENT_ACTIVATING), any());
verifyNoMoreInteractions(eventApplier);
verifyNoInteractions(typedRecordProcessor);
}

@Test
Expand All @@ -86,15 +80,8 @@ public void shouldSkipCommands() {
startStreamProcessor(typedRecordProcessor, eventApplier);

// then
final InOrder inOrder = inOrder(typedRecordProcessor, eventApplier);
inOrder
.verify(typedRecordProcessor, never())
.processRecord(anyLong(), any(), any(), any(), any());
inOrder.verify(eventApplier, never()).applyState(anyLong(), eq(ACTIVATE_ELEMENT), any());
inOrder
.verify(typedRecordProcessor, TIMEOUT.times(EXPECTED_ON_RECOVERED_INVOCATIONS))
.onRecovered(any());
inOrder.verifyNoMoreInteractions();
verify(eventApplier, never()).applyState(anyLong(), eq(ACTIVATE_ELEMENT), any());
verifyNoInteractions(typedRecordProcessor);
}

@Test
Expand All @@ -108,15 +95,7 @@ public void shouldSkipRejections() {
startStreamProcessor(typedRecordProcessor, eventApplier);

// then
final InOrder inOrder = inOrder(typedRecordProcessor, eventApplier);
inOrder
.verify(typedRecordProcessor, never())
.processRecord(anyLong(), any(), any(), any(), any());
inOrder.verify(eventApplier, never()).applyState(anyLong(), eq(ACTIVATE_ELEMENT), any());
inOrder
.verify(typedRecordProcessor, TIMEOUT.times(EXPECTED_ON_RECOVERED_INVOCATIONS))
.onRecovered(any());
inOrder.verifyNoMoreInteractions();
verifyNoInteractions(typedRecordProcessor, eventApplier);
}

@Test
Expand Down Expand Up @@ -171,9 +150,6 @@ public void shouldRestoreKeyGenerator() {
startStreamProcessor(typedRecordProcessor, eventApplier);

// then
verify(typedRecordProcessor, TIMEOUT.times(EXPECTED_ON_RECOVERED_INVOCATIONS))
.onRecovered(any());

final var keyGenerator = streamProcessorRule.getZeebeState().getKeyGenerator();
assertThat(keyGenerator.nextKey()).isEqualTo(lastGeneratedKey + 1);
}
Expand All @@ -194,9 +170,6 @@ public void shouldIgnoreKeysFromDifferentPartition() {
startStreamProcessor(typedRecordProcessor, eventApplier);

// then
verify(typedRecordProcessor, TIMEOUT.times(EXPECTED_ON_RECOVERED_INVOCATIONS))
.onRecovered(any());

final var keyGenerator = streamProcessorRule.getZeebeState().getKeyGenerator();
assertThat(keyGenerator.nextKey()).isEqualTo(keyOfThisPartition + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
Expand Down Expand Up @@ -117,38 +118,6 @@ public void onFailed() {
assertThat(failedLatch.await(1000, TimeUnit.MILLISECONDS)).isTrue();
}

@Test
public void shouldCallRecordProcessorLifecycle() throws Exception {
// given
final var typedRecordProcessor = mock(TypedRecordProcessor.class);
final var recoveredLatch = new CountDownLatch(1);
streamProcessorRule.startTypedStreamProcessor(
(processors, state) ->
processors
.onCommand(
ValueType.PROCESS_INSTANCE,
ProcessInstanceIntent.ACTIVATE_ELEMENT,
typedRecordProcessor)
.withListener(
new StreamProcessorLifecycleAware() {
@Override
public void onRecovered(final ReadonlyProcessingContext context) {
recoveredLatch.countDown();
}
}));

// when
recoveredLatch.await();
streamProcessorRule.closeStreamProcessor();

// then
final InOrder inOrder = inOrder(typedRecordProcessor);
inOrder.verify(typedRecordProcessor, times(1)).onRecovered(any());
inOrder.verify(typedRecordProcessor, times(1)).onClose();

inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldProcessRecord() {
// given
Expand All @@ -166,13 +135,10 @@ public void shouldProcessRecord() {
ProcessInstanceIntent.ACTIVATE_ELEMENT, Records.processInstance(1));

// then
final InOrder inOrder = inOrder(typedRecordProcessor);
inOrder.verify(typedRecordProcessor, TIMEOUT.times(1)).onRecovered(any());
inOrder
.verify(typedRecordProcessor, TIMEOUT.times(1))
verify(typedRecordProcessor, TIMEOUT.times(1))
.processRecord(eq(position), any(), any(), any(), any());

inOrder.verifyNoMoreInteractions();
verifyNoMoreInteractions(typedRecordProcessor);

Awaitility.await()
.untilAsserted(
Expand Down Expand Up @@ -210,13 +176,10 @@ public void shouldRetryProcessingRecordOnRecoverableException() {
ProcessInstanceIntent.ACTIVATE_ELEMENT, PROCESS_INSTANCE_RECORD);

// then
final InOrder inOrder = inOrder(typedRecordProcessor);
inOrder.verify(typedRecordProcessor, TIMEOUT.times(1)).onRecovered(any());
inOrder
.verify(typedRecordProcessor, TIMEOUT.times(2))
verify(typedRecordProcessor, TIMEOUT.times(1))
.processRecord(eq(position), any(), any(), any(), any());

inOrder.verifyNoMoreInteractions();
verifyNoMoreInteractions(typedRecordProcessor);
}

@Test
Expand All @@ -239,16 +202,10 @@ public void shouldIgnoreRecordWhenNoProcessorExistForThisType() {
ProcessInstanceIntent.TERMINATE_ELEMENT, PROCESS_INSTANCE_RECORD);

// then
final InOrder inOrder = inOrder(typedRecordProcessor);
inOrder.verify(typedRecordProcessor, TIMEOUT.times(1)).onRecovered(any());
inOrder
.verify(typedRecordProcessor, TIMEOUT.times(1))
verify(typedRecordProcessor, TIMEOUT.times(1))
.processRecord(eq(firstPosition), any(), any(), any(), any());
inOrder
.verify(typedRecordProcessor, never())
.processRecord(eq(secondPosition), any(), any(), any(), any());

inOrder.verifyNoMoreInteractions();
verifyNoMoreInteractions(typedRecordProcessor);
}

@Test
Expand Down Expand Up @@ -281,7 +238,6 @@ public void shouldProcessOnlyCommands() {

// then
final InOrder inOrder = inOrder(typedRecordProcessor);
inOrder.verify(typedRecordProcessor, TIMEOUT).onRecovered(any());
inOrder
.verify(typedRecordProcessor, TIMEOUT)
.processRecord(eq(commandPosition), any(), any(), any(), any());
Expand Down

0 comments on commit c578bc9

Please sign in to comment.