Skip to content

Commit

Permalink
Fix merge issues, indentation, and warnings
Browse files Browse the repository at this point in the history
Fix merge issues, indentation, and warnings

#2021
  • Loading branch information
smcvb committed Sep 6, 2022
1 parent 455d650 commit 2e309b0
Showing 1 changed file with 23 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -215,7 +216,7 @@ void handlingEventsAreCorrectlyTraced() throws Exception {
mockEventHandlerInvoker();
doAnswer(
answer -> {
EventMessage message = answer.getArgument(0, EventMessage.class);
EventMessage<?> message = answer.getArgument(0, EventMessage.class);
invokedMessages.add(message);
spanFactory.verifySpanActive("PooledStreamingEventProcessor[test].process", message);
countDownLatch.countDown();
Expand All @@ -229,11 +230,12 @@ void handlingEventsAreCorrectlyTraced() throws Exception {
events.forEach(stubMessageSource::publishMessage);
testSubject.start();
assertTrue(countDownLatch.await(5, TimeUnit.SECONDS));
invokedMessages.forEach(e -> {
assertWithin(1, TimeUnit.SECONDS, () -> {
spanFactory.verifySpanCompleted("PooledStreamingEventProcessor[test].process", e);
});
});
invokedMessages.forEach(
e -> assertWithin(
1, TimeUnit.SECONDS,
() -> spanFactory.verifySpanCompleted("PooledStreamingEventProcessor[test].process", e)
)
);
}

@Test
Expand All @@ -242,13 +244,15 @@ void processorOnlyTriesToClaimAvailableSegments() {
tokenStore.storeToken(new GlobalSequenceTrackingToken(2L), "test", 1);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 2);
tokenStore.storeToken(new GlobalSequenceTrackingToken(1L), "test", 3);
when(tokenStore.fetchAvailableSegments(testSubject.getName())).thenReturn(Collections.singletonList(Segment.computeSegment(2, 0, 1, 2, 3)));
when(tokenStore.fetchAvailableSegments(testSubject.getName()))
.thenReturn(Collections.singletonList(Segment.computeSegment(2, 0, 1, 2, 3)));

testSubject.start();

assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(1, testSubject.processingStatus().size()));
assertWithin(1, TimeUnit.SECONDS, () -> assertTrue(testSubject.processingStatus().containsKey(2)));
verify(tokenStore, never()).fetchToken(eq(testSubject.getName()), intThat(i -> Arrays.asList(0, 1, 3).contains(i)));
verify(tokenStore, never())
.fetchToken(eq(testSubject.getName()), intThat(i -> Arrays.asList(0, 1, 3).contains(i)));
}

@Test
Expand Down Expand Up @@ -389,17 +393,15 @@ void tokenStoreReturningSingleNullToken() {

testSubject.start();

assertWithin(1, TimeUnit.SECONDS, () -> {
assertEquals(2, testSubject.processingStatus().size());
});
assertWithin(1, TimeUnit.SECONDS, () -> assertEquals(2, testSubject.processingStatus().size()));
}

@Test
void eventsWhichMustBeIgnoredAreNotHandledOnlyValidated() throws Exception {
setTestSubject(createTestSubject(builder -> builder.initialSegmentCount(1)));

// The custom ArgumentMatcher, for some reason, first runs the assertion with null, failing the current check.
// Hence a null check is added to the matcher.
// Hence, a null check is added to the matcher.
when(stubEventHandler.canHandle(
argThat(argument -> argument != null && Integer.class.equals(argument.getPayloadType())), any()
)).thenReturn(false);
Expand Down Expand Up @@ -918,13 +920,15 @@ private void mockEventHandlerInvoker() {
void buildWithNullSpanFactoryThrowsAxonConfigurationException() {
PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();

//noinspection ConstantConditions
assertThrows(AxonConfigurationException.class, () -> builderTestSubject.spanFactory(null));
}

@Test
void buildWithNullMessageSourceThrowsAxonConfigurationException() {
PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();

//noinspection ConstantConditions
assertThrows(AxonConfigurationException.class, () -> builderTestSubject.messageSource(null));
}

Expand All @@ -942,6 +946,7 @@ void buildWithoutMessageSourceThrowsAxonConfigurationException() {
void buildWithNullTokenStoreThrowsAxonConfigurationException() {
PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();

//noinspection ConstantConditions
assertThrows(AxonConfigurationException.class, () -> builderTestSubject.tokenStore(null));
}

Expand All @@ -961,6 +966,7 @@ void buildWithoutTokenStoreThrowsAxonConfigurationException() {
void buildWithNullTransactionManagerThrowsAxonConfigurationException() {
PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();

//noinspection ConstantConditions
assertThrows(AxonConfigurationException.class, () -> builderTestSubject.transactionManager(null));
}

Expand All @@ -980,6 +986,7 @@ void buildWithoutTransactionManagerThrowsAxonConfigurationException() {
void buildWithNullCoordinatorExecutorThrowsAxonConfigurationException() {
PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();

//noinspection ConstantConditions
assertThrows(
AxonConfigurationException.class,
() -> builderTestSubject.coordinatorExecutor((ScheduledExecutorService) null)
Expand All @@ -990,6 +997,7 @@ void buildWithNullCoordinatorExecutorThrowsAxonConfigurationException() {
void buildWithNullCoordinatorExecutorBuilderThrowsAxonConfigurationException() {
PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();

//noinspection ConstantConditions
assertThrows(
AxonConfigurationException.class,
() -> builderTestSubject.coordinatorExecutor((Function<String, ScheduledExecutorService>) null)
Expand All @@ -1013,6 +1021,7 @@ void buildWithoutCoordinatorExecutorThrowsAxonConfigurationException() {
void buildWithNullWorkerExecutorThrowsAxonConfigurationException() {
PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();

//noinspection ConstantConditions
assertThrows(
AxonConfigurationException.class,
() -> builderTestSubject.workerExecutor((ScheduledExecutorService) null)
Expand All @@ -1023,6 +1032,7 @@ void buildWithNullWorkerExecutorThrowsAxonConfigurationException() {
void buildWithNullWorkerExecutorBuilderThrowsAxonConfigurationException() {
PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();

//noinspection ConstantConditions
assertThrows(
AxonConfigurationException.class,
() -> builderTestSubject.workerExecutor((Function<String, ScheduledExecutorService>) null)
Expand Down Expand Up @@ -1055,6 +1065,7 @@ void buildWithZeroOrNegativeInitialSegmentCountThrowsAxonConfigurationException(
void buildWithNullInitialTokenThrowsAxonConfigurationException() {
PooledStreamingEventProcessor.Builder builderTestSubject = PooledStreamingEventProcessor.builder();

//noinspection ConstantConditions
assertThrows(AxonConfigurationException.class, () -> builderTestSubject.initialToken(null));
}

Expand Down

0 comments on commit 2e309b0

Please sign in to comment.