Skip to content

Commit

Permalink
style: apply codestyle
Browse files Browse the repository at this point in the history
(cherry picked from commit 3fbde5f)
  • Loading branch information
Zelldon committed Sep 30, 2022
1 parent 6181e7d commit 352e767
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public StreamPlatform(
actorScheduler.submitActor(writeActor);

defaultMockedRecordProcessor = mock(RecordProcessor.class);
when(defaultMockedRecordProcessor.process(any(), any())).thenReturn(EmptyProcessingResult.INSTANCE);
when(defaultMockedRecordProcessor.process(any(), any()))
.thenReturn(EmptyProcessingResult.INSTANCE);
when(defaultMockedRecordProcessor.onProcessingError(any(), any(), any()))
.thenReturn(EmptyProcessingResult.INSTANCE);
when(defaultMockedRecordProcessor.accepts(any())).thenReturn(true);
Expand Down Expand Up @@ -205,8 +206,6 @@ public StreamProcessor buildStreamProcessor(
final var storage = createRuntimeFolder(stream);
final var snapshot = storage.getParent().resolve(SNAPSHOT_FOLDER);



final ZeebeDb<?> zeebeDb;
if (snapshotWasTaken) {
zeebeDb = zeebeDbFactory.createDb(snapshot.toFile());
Expand All @@ -232,7 +231,7 @@ public StreamProcessor buildStreamProcessor(
final var openFuture = streamProcessor.openAsync(false);

if (awaitOpening) { // and recovery
verify(mockProcessorLifecycleAware, timeout(15 * 1000)).onRecovered(any());
verify(mockProcessorLifecycleAware, timeout(15 * 1000)).onRecovered(any());
}
openFuture.join(15, TimeUnit.SECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@ public void shouldCallStreamProcessorLifecycle() throws Exception {
public void shouldCallStreamProcessorLifecycleOnFail() {
// given
final var mockProcessorLifecycleAware = streamPlatform.getMockProcessorLifecycleAware();
doThrow(new RuntimeException("force fail")).when(mockProcessorLifecycleAware).onRecovered(any());
doThrow(new RuntimeException("force fail"))
.when(mockProcessorLifecycleAware)
.onRecovered(any());

// when
streamPlatform.startStreamProcessor();
Expand Down Expand Up @@ -140,7 +142,9 @@ public void shouldCallOnErrorWhenProcessingFails() {
inOrder.verify(defaultRecordProcessor, TIMEOUT).init(any());
inOrder.verify(defaultRecordProcessor, TIMEOUT).accepts(ValueType.PROCESS_INSTANCE);
inOrder.verify(defaultRecordProcessor, TIMEOUT).process(any(), any());
inOrder.verify(defaultRecordProcessor, TIMEOUT).onProcessingError(eq(processingError), any(), any());
inOrder
.verify(defaultRecordProcessor, TIMEOUT)
.onProcessingError(eq(processingError), any(), any());
inOrder.verifyNoMoreInteractions();
}

Expand All @@ -163,7 +167,9 @@ public void shouldLoopWhenOnErrorFails() {
inOrder.verify(defaultRecordProcessor, TIMEOUT).init(any());
inOrder.verify(defaultRecordProcessor, TIMEOUT).accepts(ValueType.PROCESS_INSTANCE);
inOrder.verify(defaultRecordProcessor, TIMEOUT).process(any(), any());
inOrder.verify(defaultRecordProcessor, TIMEOUT.atLeast(5)).onProcessingError(eq(processingError), any(), any());
inOrder
.verify(defaultRecordProcessor, TIMEOUT.atLeast(5))
.onProcessingError(eq(processingError), any(), any());
}

@Test
Expand Down Expand Up @@ -221,14 +227,14 @@ public void shouldWriteFollowUpEventsAndCommands() {
// given
final var defaultRecordProcessor = streamPlatform.getDefaultMockedRecordProcessor();
final var firstResultBuilder = new BufferedProcessingResultBuilder((c, v) -> true);
firstResultBuilder.appendRecordReturnEither(1, RecordType.EVENT,
ACTIVATE_ELEMENT, RejectionType.NULL_VAL, "", RECORD);
firstResultBuilder.appendRecordReturnEither(2, RecordType.COMMAND,
ACTIVATE_ELEMENT, RejectionType.NULL_VAL, "", RECORD);
firstResultBuilder.appendRecordReturnEither(
1, RecordType.EVENT, ACTIVATE_ELEMENT, RejectionType.NULL_VAL, "", RECORD);
firstResultBuilder.appendRecordReturnEither(
2, RecordType.COMMAND, ACTIVATE_ELEMENT, RejectionType.NULL_VAL, "", RECORD);

final var secondResultBuilder = new BufferedProcessingResultBuilder((c, v) -> true);
secondResultBuilder.appendRecordReturnEither(3, RecordType.EVENT,
ACTIVATE_ELEMENT, RejectionType.NULL_VAL, "", RECORD);
secondResultBuilder.appendRecordReturnEither(
3, RecordType.EVENT, ACTIVATE_ELEMENT, RejectionType.NULL_VAL, "", RECORD);

when(defaultRecordProcessor.process(any(), any()))
.thenReturn(firstResultBuilder.build())
Expand All @@ -238,17 +244,21 @@ public void shouldWriteFollowUpEventsAndCommands() {
streamPlatform.startStreamProcessor();

// when
streamPlatform.writeBatch(
command().processInstance(ACTIVATE_ELEMENT, RECORD));
streamPlatform.writeBatch(command().processInstance(ACTIVATE_ELEMENT, RECORD));

// then
verify(defaultRecordProcessor, TIMEOUT.times(2)).process(any(), any());
await("Last written position should be updated").untilAsserted(() ->
assertThat(streamPlatform.getLogStream().getLastWrittenPosition()).isEqualTo(4)
);
await("Last processed position should be updated").untilAsserted(() ->
assertThat(streamPlatform.getStreamProcessor().getLastProcessedPositionAsync().join()).isEqualTo(3));
await("Last written position should be updated")
.untilAsserted(
() -> assertThat(streamPlatform.getLogStream().getLastWrittenPosition()).isEqualTo(4));
await("Last processed position should be updated")
.untilAsserted(
() ->
assertThat(
streamPlatform.getStreamProcessor().getLastProcessedPositionAsync().join())
.isEqualTo(3));
}

@Test
public void shouldExecutePostCommitTask() {
// given
Expand All @@ -261,8 +271,7 @@ public void shouldExecutePostCommitTask() {
streamPlatform.startStreamProcessor();

// when
streamPlatform.writeBatch(
command().processInstance(ACTIVATE_ELEMENT, RECORD));
streamPlatform.writeBatch(command().processInstance(ACTIVATE_ELEMENT, RECORD));

// then
verify(mockPostCommitTask, TIMEOUT).flush();
Expand All @@ -282,8 +291,7 @@ public void shouldRepeatExecutePostCommitTask() {
streamPlatform.startStreamProcessor();

// when
streamPlatform.writeBatch(
command().processInstance(ACTIVATE_ELEMENT, RECORD));
streamPlatform.writeBatch(command().processInstance(ACTIVATE_ELEMENT, RECORD));

// then
verify(mockPostCommitTask, TIMEOUT.atLeast(5)).flush();
Expand Down Expand Up @@ -318,18 +326,20 @@ public void shouldNotRepeatPostCommitOnException() throws Exception {
public void shouldUpdateStateOnSuccessfulProcessing() {
// given
final var testProcessor = spy(new TestProcessor());
testProcessor.processingAction = (ctx) -> {
final var zeebeDb = ctx.getZeebeDb();
final var keyGenerator = new DbKeyGenerator(1, zeebeDb, ctx.getTransactionContext());
keyGenerator.nextKey();
keyGenerator.nextKey();
keyGenerator.nextKey();
};
testProcessor.processingAction =
(ctx) -> {
final var zeebeDb = ctx.getZeebeDb();
final var keyGenerator = new DbKeyGenerator(1, zeebeDb, ctx.getTransactionContext());
keyGenerator.nextKey();
keyGenerator.nextKey();
keyGenerator.nextKey();
};
// in order to not mark the processing as skipped we need to return a result
testProcessor.processingResult = new BufferedProcessingResultBuilder((c, s) -> true).build();
doCallRealMethod()
.doReturn(EmptyProcessingResult.INSTANCE)
.when(testProcessor).process(any(), any());
.when(testProcessor)
.process(any(), any());
streamPlatform.withRecordProcessors(List.of(testProcessor)).startStreamProcessor();

final var zeebeDb = testProcessor.recordProcessorContext.getZeebeDb();
Expand All @@ -352,20 +362,22 @@ public void shouldUpdateStateOnSuccessfulProcessing() {
public void shouldNotUpdateStateOnExceptionInProcessing() {
// given
final var testProcessor = spy(new TestProcessor());
testProcessor.processingAction = (ctx) -> {
final var zeebeDb = ctx.getZeebeDb();
final var keyGenerator = new DbKeyGenerator(1, zeebeDb, ctx.getTransactionContext());
keyGenerator.nextKey();
keyGenerator.nextKey();
keyGenerator.nextKey();

throw new RuntimeException("expected");
};
testProcessor.processingAction =
(ctx) -> {
final var zeebeDb = ctx.getZeebeDb();
final var keyGenerator = new DbKeyGenerator(1, zeebeDb, ctx.getTransactionContext());
keyGenerator.nextKey();
keyGenerator.nextKey();
keyGenerator.nextKey();

throw new RuntimeException("expected");
};
// in order to not mark the processing as skipped we need to return a result
testProcessor.processingResult = new BufferedProcessingResultBuilder((c, s) -> true).build();
doCallRealMethod()
.doReturn(EmptyProcessingResult.INSTANCE)
.when(testProcessor).process(any(), any());
.when(testProcessor)
.process(any(), any());
streamPlatform.withRecordProcessors(List.of(testProcessor)).startStreamProcessor();

final var zeebeDb = testProcessor.recordProcessorContext.getZeebeDb();
Expand All @@ -388,19 +400,22 @@ public void shouldNotUpdateStateOnExceptionInProcessing() {
public void shouldUpdateStateOnProcessingErrorCall() {
// given
final var testProcessor = spy(new TestProcessor());
testProcessor.processingAction = (ctx) -> {
throw new RuntimeException("expected");
};
testProcessor.onProcessingErrorAction = (ctx) -> {
final var zeebeDb = ctx.getZeebeDb();
final var keyGenerator = new DbKeyGenerator(1, zeebeDb, ctx.getTransactionContext());
keyGenerator.nextKey();
keyGenerator.nextKey();
keyGenerator.nextKey();
};
testProcessor.processingAction =
(ctx) -> {
throw new RuntimeException("expected");
};
testProcessor.onProcessingErrorAction =
(ctx) -> {
final var zeebeDb = ctx.getZeebeDb();
final var keyGenerator = new DbKeyGenerator(1, zeebeDb, ctx.getTransactionContext());
keyGenerator.nextKey();
keyGenerator.nextKey();
keyGenerator.nextKey();
};
doCallRealMethod()
.doReturn(EmptyProcessingResult.INSTANCE)
.when(testProcessor).process(any(), any());
.when(testProcessor)
.process(any(), any());
streamPlatform.withRecordProcessors(List.of(testProcessor)).startStreamProcessor();

final var zeebeDb = testProcessor.recordProcessorContext.getZeebeDb();
Expand All @@ -423,24 +438,28 @@ public void shouldUpdateStateOnProcessingErrorCall() {
public void shouldNotUpdateStateOnExceptionOnProcessingErrorCall() {
// given
final var testProcessor = spy(new TestProcessor());
testProcessor.processingAction = (ctx) -> {
throw new RuntimeException("expected");
};
testProcessor.onProcessingErrorAction = (ctx) -> {
final var zeebeDb = ctx.getZeebeDb();
final var keyGenerator = new DbKeyGenerator(1, zeebeDb, ctx.getTransactionContext());
keyGenerator.nextKey();
keyGenerator.nextKey();
keyGenerator.nextKey();

throw new RuntimeException("expected");
};
testProcessor.processingAction =
(ctx) -> {
throw new RuntimeException("expected");
};
testProcessor.onProcessingErrorAction =
(ctx) -> {
final var zeebeDb = ctx.getZeebeDb();
final var keyGenerator = new DbKeyGenerator(1, zeebeDb, ctx.getTransactionContext());
keyGenerator.nextKey();
keyGenerator.nextKey();
keyGenerator.nextKey();

throw new RuntimeException("expected");
};
doCallRealMethod()
.doReturn(EmptyProcessingResult.INSTANCE)
.when(testProcessor).process(any(), any());
.when(testProcessor)
.process(any(), any());
doCallRealMethod()
.doReturn(EmptyProcessingResult.INSTANCE)
.when(testProcessor).onProcessingError(any(), any(), any());
.when(testProcessor)
.onProcessingError(any(), any(), any());
streamPlatform.withRecordProcessors(List.of(testProcessor)).startStreamProcessor();

final var zeebeDb = testProcessor.recordProcessorContext.getZeebeDb();
Expand All @@ -465,7 +484,16 @@ public void shouldWriteResponse() {
final var defaultMockedRecordProcessor = streamPlatform.getDefaultMockedRecordProcessor();

final var resultBuilder = new BufferedProcessingResultBuilder((c, s) -> true);
resultBuilder.withResponse(RecordType.EVENT, 3, ELEMENT_ACTIVATING, RECORD, ValueType.PROCESS_INSTANCE, RejectionType.NULL_VAL, "", 1, 12);
resultBuilder.withResponse(
RecordType.EVENT,
3,
ELEMENT_ACTIVATING,
RECORD,
ValueType.PROCESS_INSTANCE,
RejectionType.NULL_VAL,
"",
1,
12);
when(defaultMockedRecordProcessor.process(any(), any()))
.thenReturn(resultBuilder.build())
.thenReturn(EmptyProcessingResult.INSTANCE);
Expand All @@ -480,8 +508,7 @@ public void shouldWriteResponse() {
// then
verify(defaultMockedRecordProcessor, TIMEOUT.times(2)).process(any(), any());

final var commandResponseWriter =
streamPlatform.getMockCommandResponseWriter();
final var commandResponseWriter = streamPlatform.getMockCommandResponseWriter();

verify(commandResponseWriter, TIMEOUT.times(1)).key(3);
verify(commandResponseWriter, TIMEOUT.times(1))
Expand All @@ -500,7 +527,16 @@ public void shouldWriteResponseOnFailedEventProcessing() {
.thenReturn(EmptyProcessingResult.INSTANCE);

final var resultBuilder = new BufferedProcessingResultBuilder((c, s) -> true);
resultBuilder.withResponse(RecordType.EVENT, 3, ELEMENT_ACTIVATING, RECORD, ValueType.PROCESS_INSTANCE, RejectionType.NULL_VAL, "", 1, 12);
resultBuilder.withResponse(
RecordType.EVENT,
3,
ELEMENT_ACTIVATING,
RECORD,
ValueType.PROCESS_INSTANCE,
RejectionType.NULL_VAL,
"",
1,
12);
when(defaultMockedRecordProcessor.onProcessingError(any(), any(), any()))
.thenReturn(resultBuilder.build())
.thenReturn(EmptyProcessingResult.INSTANCE);
Expand All @@ -516,8 +552,7 @@ public void shouldWriteResponseOnFailedEventProcessing() {
verify(defaultMockedRecordProcessor, TIMEOUT.times(2)).process(any(), any());
verify(defaultMockedRecordProcessor, TIMEOUT.times(1)).onProcessingError(any(), any(), any());

final var commandResponseWriter =
streamPlatform.getMockCommandResponseWriter();
final var commandResponseWriter = streamPlatform.getMockCommandResponseWriter();

verify(commandResponseWriter, TIMEOUT.times(1)).key(3);
verify(commandResponseWriter, TIMEOUT.times(1))
Expand All @@ -534,8 +569,7 @@ public void shouldInvokeOnProcessedListenerWhenReturnResult() {
final var defaultMockedRecordProcessor = streamPlatform.getDefaultMockedRecordProcessor();

final var resultBuilder = new BufferedProcessingResultBuilder((c, s) -> true);
when(defaultMockedRecordProcessor.process(any(), any()))
.thenReturn(resultBuilder.build());
when(defaultMockedRecordProcessor.process(any(), any())).thenReturn(resultBuilder.build());
streamPlatform.startStreamProcessor();

// when
Expand Down Expand Up @@ -623,12 +657,14 @@ public void shouldNotUpdateLastWrittenPositionWhenSkipped() {

// then
verify(defaultRecordProcessor, TIMEOUT.times(2)).process(any(), any());
await("Last written position should be updated").untilAsserted(() ->
assertThat(streamPlatform.getStreamProcessor().getLastWrittenPositionAsync().join()).isEqualTo(-1)
);
await("Last written position should be updated")
.untilAsserted(
() ->
assertThat(streamPlatform.getStreamProcessor().getLastWrittenPositionAsync().join())
.isEqualTo(-1));
}

private final static class TestProcessor implements RecordProcessor {
private static final class TestProcessor implements RecordProcessor {

ProcessingResult processingResult = EmptyProcessingResult.INSTANCE;
ProcessingResult processingResultOnError = EmptyProcessingResult.INSTANCE;
Expand All @@ -647,18 +683,18 @@ public boolean accepts(final ValueType valueType) {
}

@Override
public void replay(final TypedRecord record) {
}
public void replay(final TypedRecord record) {}

@Override
public ProcessingResult process(final TypedRecord record,
final ProcessingResultBuilder processingResultBuilder) {
public ProcessingResult process(
final TypedRecord record, final ProcessingResultBuilder processingResultBuilder) {
processingAction.accept(recordProcessorContext);
return processingResult;
}

@Override
public ProcessingResult onProcessingError(final Throwable processingException,
public ProcessingResult onProcessingError(
final Throwable processingException,
final TypedRecord record,
final ProcessingResultBuilder processingResultBuilder) {
onProcessingErrorAction.accept(recordProcessorContext);
Expand Down

0 comments on commit 352e767

Please sign in to comment.