Skip to content

Commit

Permalink
Upodated aggregator RedisMessageStoreAggregatorTests to use RedisTest…
Browse files Browse the repository at this point in the history
…ContainerSupport.
  • Loading branch information
corneil committed Jan 11, 2023
1 parent 3aa662f commit 570a8ec
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 27 deletions.
4 changes: 0 additions & 4 deletions applications/sink/redis-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@
<relativePath>../../stream-applications-core/pom.xml</relativePath>
</parent>

<properties>
<embedded-redis.version>2.0.11</embedded-redis.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
@Testcontainers(disabledWithoutDocker = true)
public interface RedisTestContainerSupport {
GenericContainer<?> REDIS_CONTAINER = new GenericContainer<>("redis:7.0.2")
GenericContainer<?> REDIS_CONTAINER = new GenericContainer<>("redis:7")
.withExposedPorts(6379)
.withStartupTimeout(Duration.ofSeconds(120))
.withStartupAttempts(3);
Expand Down
7 changes: 7 additions & 0 deletions functions/function/aggregator-function/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@
<artifactId>spring-boot-starter-data-redis</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud.fn</groupId>
<artifactId>redis-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!--JDBC-->
<dependency>
<groupId>org.springframework.integration</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
import java.time.Duration;
import java.util.List;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import org.springframework.cloud.fn.consumer.redis.RedisTestContainerSupport;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.redis.store.RedisMessageStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.context.TestPropertySource;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -36,35 +38,38 @@
* @author Artem Bilan
*/
@TestPropertySource(properties = "aggregator.message-store-type=redis")
@Disabled("Needs real Redis Server to be run") // TODO add redis test container
public class RedisMessageStoreAggregatorTests extends AbstractAggregatorFunctionTests {
public class RedisMessageStoreAggregatorTests extends AbstractAggregatorFunctionTests implements RedisTestContainerSupport {
@DynamicPropertySource
static void redisProperties(DynamicPropertyRegistry registry) {
registry.add("spring.data.redis.url", RedisTestContainerSupport::getUri);
}

@Test
public void test() {
Flux<Message<?>> input =
Flux.just(MessageBuilder.withPayload("2")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
.build(),
MessageBuilder.withPayload("1")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 1)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
.build());
Flux.just(MessageBuilder.withPayload("2")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 2)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
.build(),
MessageBuilder.withPayload("1")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, "my_correlation")
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER, 1)
.setHeader(IntegrationMessageHeaderAccessor.SEQUENCE_SIZE, 2)
.build());

Flux<Message<?>> output = this.aggregatorFunction.apply(input);

output.as(StepVerifier::create)
.assertNext((message) ->
assertThat(message)
.extracting(Message::getPayload)
.isInstanceOf(List.class)
.asList()
.hasSize(2)
.contains("1", "2"))
.thenCancel()
.verify(Duration.ofSeconds(10));
.assertNext((message) ->
assertThat(message)
.extracting(Message::getPayload)
.isInstanceOf(List.class)
.asList()
.hasSize(2)
.contains("1", "2"))
.thenCancel()
.verify(Duration.ofSeconds(10));

assertThat(this.messageGroupStore).isInstanceOf(RedisMessageStore.class);

Expand Down

0 comments on commit 570a8ec

Please sign in to comment.