Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming query #2001

Merged
merged 41 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
5380e38
Streaming Query initial sketch:
m1l4n54v1c Oct 21, 2021
8381c32
Tests for streamingQuery
schananas Oct 21, 2021
cfad312
Tests for streamingQuery & refactoring
schananas Oct 21, 2021
e3d0e03
Implementation of streamingQuery
schananas Oct 22, 2021
3ce3f69
Making Streaming Query part of regular query. It is distinguished by …
m1l4n54v1c Oct 25, 2021
01d61bc
Dispose a subscription once a receiver completes the query.
m1l4n54v1c Oct 28, 2021
60b06e4
Add custom subscriber that reacts on flow control instructions
schananas Nov 2, 2021
0889b1b
Adjust flow control from AxonServerQueryBus
schananas Nov 5, 2021
3bbee02
Add streaming query API to query gateway
schananas Nov 8, 2021
a1f2f32
Query gateway test for streaming query
schananas Nov 8, 2021
f36a147
Query gateway test for streaming query
schananas Nov 9, 2021
c212e2a
Clean up
schananas Nov 9, 2021
198dbfe
Clean up on stream complete
schananas Nov 10, 2021
8b6af75
Clean up on stream complete
schananas Nov 10, 2021
d976723
onError should send a query response with error.
m1l4n54v1c Nov 11, 2021
578f181
Return Publisher instead of Flux for backwards compatability
schananas Nov 17, 2021
029c693
Merge branch 'streaming-query' of https://github.com/AxonFramework/Ax…
schananas Nov 17, 2021
0df7a33
streaming query
m1l4n54v1c Feb 4, 2022
3385504
WIP: added additional method for streaming query.
m1l4n54v1c Feb 11, 2022
56c92da
Streaming query - QueryGateway
schananas Feb 11, 2022
0b505b5
Switch handler on error
schananas Feb 11, 2022
697b95d
Added backwards compatibility tests.
m1l4n54v1c Feb 14, 2022
bb610af
Added JavaDoc.
m1l4n54v1c Feb 14, 2022
bba17c4
Added JavaDoc to BlockingQueryResponseProcessingTask.
m1l4n54v1c Feb 14, 2022
579748b
Added JavaDoc to PrioritizedRunnable.
m1l4n54v1c Feb 14, 2022
f7d38b6
Switched to package private visibility of PrioritizedRunnable.
m1l4n54v1c Feb 14, 2022
eaf8bfa
More JavaDoc and cleanup.
m1l4n54v1c Feb 14, 2022
8813d91
Merge branch 'master' into streaming-query
m1l4n54v1c Feb 14, 2022
09f97f6
Fixed versions and merged main branch.
m1l4n54v1c Feb 14, 2022
dd4a52b
Disabled possibility to stream query results via regular query.
m1l4n54v1c Feb 23, 2022
d758d0e
code review: api changes.
m1l4n54v1c Mar 1, 2022
97c4129
- handled the case of cancelling streamable result before result is r…
m1l4n54v1c Mar 9, 2022
b823c06
Update axon-server-connector/src/main/java/org/axonframework/axonserv…
m1l4n54v1c Mar 21, 2022
b9b25dc
Update axon-server-connector/src/main/java/org/axonframework/axonserv…
m1l4n54v1c Mar 21, 2022
1f3b320
Update axon-server-connector/src/main/java/org/axonframework/axonserv…
m1l4n54v1c Mar 21, 2022
c9c17c9
Update axon-server-connector/src/main/java/org/axonframework/axonserv…
m1l4n54v1c Mar 21, 2022
fd1be4c
Update axon-server-connector/src/main/java/org/axonframework/axonserv…
m1l4n54v1c Mar 21, 2022
b003ac6
Update axon-server-connector/src/main/java/org/axonframework/axonserv…
m1l4n54v1c Mar 21, 2022
6e6faba
- review comments.
m1l4n54v1c Mar 21, 2022
10685d4
Merge remote-tracking branch 'origin/streaming-query' into streaming-…
m1l4n54v1c Mar 21, 2022
279a585
Adapted to the latest API changes.
m1l4n54v1c Mar 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 12 additions & 0 deletions axon-server-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector;

/**
* A {@link Runable} with a priority.
*
* @author Stefan Dragisic
* @author Milan Savic
* @since 4.6.0
*/
public interface PrioritizedRunnable extends Runnable {

/**
* The priority of this runnable.
*
* @return the priority of this runnable.
*/
long priority();
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.grpc.query.QueryResponse;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.PrioritizedRunnable;
import org.axonframework.common.AxonException;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.CompletableFuture;

/**
* Task that processes query responses. It will block until all responses are received.
*
* @param <R> the type of expected final response
* @author Allard Buijze
* @author Steven van Beelen
* @author Milan Savic
* @author Stefan Dragisic
* @since 4.6.0
*/
class BlockingQueryResponseProcessingTask<R> implements PrioritizedRunnable {

private final Publisher<QueryResponse> result;
private final QuerySerializer serializer;
private final CompletableFuture<QueryResponseMessage<R>> queryTransaction;
private final long priority;
private final ResponseType<R> expectedResponseType;

public BlockingQueryResponseProcessingTask(Publisher<QueryResponse> result,
QuerySerializer serializer,
CompletableFuture<QueryResponseMessage<R>> queryTransaction,
long priority,
ResponseType<R> expectedResponseType) {
this.result = result;
this.serializer = serializer;
this.queryTransaction = queryTransaction;
this.priority = priority;
this.expectedResponseType = expectedResponseType;
}

@Override
public long priority() {
return priority;
}

@Override
public void run() {
result.subscribe(new Subscriber<QueryResponse>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(QueryResponse queryResponse) {
queryTransaction.complete(serializer.deserializeResponse(queryResponse, expectedResponseType));
}

@Override
public void onError(Throwable t) {
AxonException exception = ErrorCode.QUERY_DISPATCH_ERROR.convert(t);
queryTransaction.completeExceptionally(exception);
}

@Override
public void onComplete() {
// noop
}
});
}
}