Skip to content

Commit

Permalink
Merge pull request #280 from AxonFramework/feature/streaming-query
Browse files Browse the repository at this point in the history
Adds support for Streaming Query
  • Loading branch information
smcvb committed May 11, 2022
2 parents d3ea5ed + 8426d96 commit 30af783
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<axon.version>4.5.9</axon.version>
<axon.version>4.6.0-SNAPSHOT</axon.version>

<spring.boot.version>2.6.7</spring.boot.version>

Expand Down
6 changes: 6 additions & 0 deletions tracing-spring-boot-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.axonframework.extensions.tracing.TracingProvider;
import org.axonframework.extensions.tracing.TracingQueryGateway;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.springboot.autoconfig.AxonServerAutoConfiguration;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.*;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -32,8 +31,7 @@
JmxAutoConfiguration.class,
WebClientAutoConfiguration.class,
HibernateJpaAutoConfiguration.class,
DataSourceAutoConfiguration.class,
AxonServerAutoConfiguration.class
DataSourceAutoConfiguration.class
})
@ExtendWith(SpringExtension.class)
class AxonAutoConfigurationWithTracingTest {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package org.axonframework.extensions.tracing.autoconfig;

import io.opentracing.Tracer;
import io.opentracing.noop.NoopTracer;
import org.axonframework.commandhandling.gateway.CommandGateway;
import org.axonframework.commandhandling.gateway.DefaultCommandGateway;
import org.axonframework.extensions.tracing.TracingCommandGateway;
import org.axonframework.extensions.tracing.TracingQueryGateway;
import org.axonframework.queryhandling.DefaultQueryGateway;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.springboot.autoconfig.AxonServerAutoConfiguration;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.*;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -33,8 +31,7 @@
JmxAutoConfiguration.class,
WebClientAutoConfiguration.class,
HibernateJpaAutoConfiguration.class,
DataSourceAutoConfiguration.class,
AxonServerAutoConfiguration.class
DataSourceAutoConfiguration.class
})
@ExtendWith(SpringExtension.class)
@TestPropertySource(properties = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import io.opentracing.Tracer;
import org.axonframework.extensions.tracing.MessageTag;
import org.axonframework.extensions.tracing.MessageTagBuilderService;
import org.axonframework.springboot.autoconfig.AxonServerAutoConfiguration;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.*;
import org.springframework.boot.autoconfigure.AutoConfigurations;
Expand Down Expand Up @@ -31,6 +30,7 @@
class TracingAutoConfigurationMessageTagTest {

private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
.withPropertyValues("axon.axonserver.enabled=false")
.withConfiguration(AutoConfigurations.of(TracingAutoConfigurationMessageTagTest.Config.class));

@Test
Expand Down Expand Up @@ -75,8 +75,7 @@ void testMessageTagWithoutCustomConfiguration() {

@EnableAutoConfiguration(exclude = {
JpaRepositoriesAutoConfiguration.class,
HibernateJpaAutoConfiguration.class,
AxonServerAutoConfiguration.class
HibernateJpaAutoConfiguration.class
})
public static class Config {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.queryhandling.DefaultQueryGateway;
import org.axonframework.queryhandling.GenericQueryMessage;
import org.axonframework.queryhandling.GenericStreamingQueryMessage;
import org.axonframework.queryhandling.GenericSubscriptionQueryMessage;
import org.axonframework.queryhandling.QueryBus;
import org.axonframework.queryhandling.QueryGateway;
import org.axonframework.queryhandling.QueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryBackpressure;
import org.axonframework.queryhandling.SubscriptionQueryMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,6 +44,7 @@
import static org.axonframework.common.BuilderUtils.assertNonNull;
import static org.axonframework.common.ObjectUtils.nullSafeTypeOf;
import static org.axonframework.messaging.GenericMessage.asMessage;
import static org.axonframework.queryhandling.QueryMessage.queryName;

/**
* A tracing {@link QueryGateway} which activates a calling {@link Span}, when the {@link CompletableFuture} completes.
Expand Down Expand Up @@ -120,6 +124,29 @@ public <R, Q> Stream<R> scatterGather(String queryName,
);
}

@Override
public <R, Q> Publisher<R> streamingQuery(Q query, Class<R> responseType) {
return streamingQuery(queryName(query), query, responseType);
}

@Override
public <R, Q> Publisher<R> streamingQuery(String queryName, Q query, Class<R> responseType) {
GenericStreamingQueryMessage<Q, R> queryMessagesMessage = new GenericStreamingQueryMessage<>(query,
queryName,
responseType);
return getWithSpan(
"streamingQuery_" + SpanUtils.messageName(nullSafeTypeOf(query), queryName),
queryMessagesMessage,
(childSpan) -> Flux.from(delegate.streamingQuery(queryName, queryMessagesMessage, responseType))
.doOnSubscribe(unused -> childSpan.log("subscriptionStarted"))
.doOnNext(unused -> childSpan.log("answerReceived"))
.doFinally(unused -> {
childSpan.log("subscriptionTerminated");
childSpan.finish();
})
);
}

@Override
public <Q, I, U> SubscriptionQueryResult<I, U> subscriptionQuery(String queryName,
Q query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.axonframework.queryhandling.QueryResponseMessage;
import org.axonframework.queryhandling.SubscriptionQueryResult;
import org.junit.jupiter.api.*;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
Expand Down Expand Up @@ -272,6 +273,41 @@ private <I, U> SubscriptionQueryResult createSubscriptionQueryResult(I initial,
() -> true);
}

@Test
void testStreamingQuery() {

Publisher<QueryResponseMessage<Object>> updates = Flux.just(new GenericQueryResponseMessage<>("answer1"),
new GenericQueryResponseMessage<>("answer2"));
when(mockQueryBus.streamingQuery(any()))
.thenReturn(updates);

MockSpan span = mockTracer.buildSpan("testStreamingQueryResults").start();
ScopeManager scopeManager = mockTracer.scopeManager();
try (final Scope ignored = scopeManager.activate(span)) {
Publisher<String> stringPublisher = testSubject.streamingQuery(new MyQuery(),
String.class);
// check the following results are there
StepVerifier.create(Flux.from(stringPublisher))
.expectNext("answer1")
.expectNext("answer2")
.expectComplete()
.verify();

// Verify the parent span is restored, and that a child span was finished.
Span activeSpan = mockTracer.activeSpan();
assertEquals(span, activeSpan);

List<MockSpan> mockSpans = mockTracer.finishedSpans();
assertEquals(1, mockSpans.size());
assertEquals("streamingQuery_MyQuery", mockSpans.get(0).operationName());
assertNotNull(mockSpans.get(0).logEntries());
assertFalse(mockSpans.get(0).logEntries().isEmpty());
assertNotNull(mockSpans.get(0).tags());
assertFalse(mockSpans.get(0).tags().isEmpty());
}
assertNull(scopeManager.activeSpan(), "There should be no activeSpan");
}

private static class MyQuery {

}
Expand Down

0 comments on commit 30af783

Please sign in to comment.