Skip to content

Commit

Permalink
merge: #10036
Browse files Browse the repository at this point in the history
10036: Create new StreamPlatformExtension r=Zelldon a=Zelldon

## Description

This is an attempt to create a new StreamPlatformExtension, which should allow us to replace the TestStreams, StreamProcessorComposite, and StreamProcessorRule. This is something that would help us to continue with cleaning up the engine/stream processing code (and tests). There was a recent attempt to remove the event applies from the stream processing interface, #9985 but this failed since several tests have high dependencies on how the engine and related stuff is built. They are highly coupled.

This is just a first step and I would like to validate with you `@saig0` and `@npepinpe,` since I think you both are quite familiar with JUnit 5 extensions and our old rules. (Note: I'm a total junit 5 noob please bare with me.)

_What the PR does:_

 - Introduce a new class StreamPlatform (combination of [TestStreams](https://github.com/camunda/zeebe/blob/main/engine/src/test/java/io/camunda/zeebe/engine/util/TestStreams.java) and [StreamComposite](https://github.com/camunda/zeebe/blob/main/engine/src/test/java/io/camunda/zeebe/engine/util/StreamProcessingComposite.java))
 - Introduce a new extension to create the StreamPlatform and make this available for tests
 - Created an example test which is a copy of [StreamProcessorReplayModeTest.java](https://github.com/camunda/zeebe/blob/main/engine/src/test/java/io/camunda/zeebe/engine/processing/streamprocessor/StreamProcessorReplayModeTest.java)

In the test class we can see how it would look like and what the stream platform could provide us. Instead of always registering mocked typed processors in streamProcessors tests we just have on mocked RecordProcessor which can be use for verifications. I think this would simplify our tests a lot, since it is currently a lot of boilter plate which is mostly for the tests not interesting at all.

Be aware that it currently only contains one test to show how it could look like and get early feedback. 
Please take a look and let me know what you think.
<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 15, 2022
2 parents 6eae1e5 + 9728088 commit 7e2cde2
Show file tree
Hide file tree
Showing 3 changed files with 540 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.processing.streamprocessor;

import static io.camunda.zeebe.engine.util.RecordToWrite.command;
import static io.camunda.zeebe.engine.util.RecordToWrite.event;
import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ACTIVATE_ELEMENT;
import static io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent.ELEMENT_ACTIVATING;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.timeout;

import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamPlatform;
import io.camunda.zeebe.engine.util.StreamPlatformExtension;
import io.camunda.zeebe.protocol.impl.record.value.processinstance.ProcessInstanceRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InOrder;
import org.mockito.verification.VerificationWithTimeout;

@ExtendWith(StreamPlatformExtension.class)
public final class NewStreamProcessorReplayModeTest {

private static final long TIMEOUT_MILLIS = 2_000L;
private static final VerificationWithTimeout TIMEOUT = timeout(TIMEOUT_MILLIS);

private static final ProcessInstanceRecord RECORD = Records.processInstance(1);

@SuppressWarnings("unused") // injected by the extension
private StreamPlatform streamPlatform;

@Test
public void shouldProcessAfterReplay() {
// given
streamPlatform.writeBatch(
command().processInstance(ACTIVATE_ELEMENT, RECORD),
event().processInstance(ELEMENT_ACTIVATING, RECORD).causedBy(0));

// when
streamPlatform.startStreamProcessor();

streamPlatform.writeBatch(
command().processInstance(ACTIVATE_ELEMENT, RECORD),
event().processInstance(ELEMENT_ACTIVATING, RECORD).causedBy(0));

// then
final var recordProcessor = streamPlatform.getRecordProcessor();
final InOrder inOrder = inOrder(recordProcessor);
inOrder.verify(recordProcessor, TIMEOUT).replay(any());
inOrder.verify(recordProcessor, TIMEOUT).process(any(), any());
inOrder.verifyNoMoreInteractions();
}
}

0 comments on commit 7e2cde2

Please sign in to comment.