Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming query support #112

Merged
merged 21 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0b6c917
Added a way to register a listener when query completes.
m1l4n54v1c Oct 28, 2021
0372f2c
Add flow control messages and listeners for query channel
schananas Nov 2, 2021
14d1c8e
Fix peek() in FlowControlledBuffer to work as described in doc
schananas Nov 4, 2021
1fbe25d
add logger for unknown requests
schananas Nov 9, 2021
a1a6049
fix javadoc and todo remove
schananas Nov 9, 2021
2231e19
fix javadoc and todo remove
schananas Nov 9, 2021
9e3cacd
add registration for query complete and query flow control
schananas Nov 10, 2021
1c082cd
Fixed Query Flow Control typo.
m1l4n54v1c Nov 24, 2021
4509600
QueryHandler now returns a handle for FlowControl.
m1l4n54v1c Dec 9, 2021
c412f06
Code review remarks.
m1l4n54v1c Dec 13, 2021
21a54a8
Added test for MultiFlowControl with a single handler.
m1l4n54v1c Dec 14, 2021
487c650
Added copyright to new files.
m1l4n54v1c Dec 15, 2021
4151402
Added buffering flow control capabilities to query functionality.
m1l4n54v1c Jan 26, 2022
bbd632b
Merge branch 'feature/adminApi/getEventProcessors' into streaming-query
m1l4n54v1c Feb 2, 2022
31d060e
Merge branch 'feature/adminApi/getEventProcessors' into streaming-query
m1l4n54v1c Feb 2, 2022
55c7f40
Merge branch 'master' into streaming-query
m1l4n54v1c Feb 3, 2022
5ce2db7
code review: API changes
m1l4n54v1c Mar 1, 2022
d412ba5
Added @since.
m1l4n54v1c Mar 4, 2022
2a23d5b
Review comments.
m1l4n54v1c Mar 21, 2022
1f91110
Sonar comments.
m1l4n54v1c Mar 21, 2022
afdfd2e
Adepted to the latest API changes.
m1l4n54v1c Mar 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<netty.tcnative.version>2.0.34.Final</netty.tcnative.version>
<junit.jupiter.version>5.8.1</junit.jupiter.version>

<axonserver.api.version>4.5</axonserver.api.version>
<axonserver.api.version>4.6.0-SNAPSHOT</axonserver.api.version>

<jacoco-maven-plugin.version>0.8.7</jacoco-maven-plugin.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected T take() throws InterruptedException {
* @return he first entry of this buffer without removing it, or {@code null} if it is empty
*/
protected T peek() {
return validate(buffer.peek(), false);
return validate(buffer.peek(), true);
smcvb marked this conversation as resolved.
Show resolved Hide resolved
}

private T validate(T peek, boolean nullOnTerminal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.axoniq.axonserver.grpc.query.QueryResponse;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* Communication channel with AxonServer for Query related interactions.
Expand Down Expand Up @@ -71,4 +72,22 @@ SubscriptionQueryResult subscriptionQuery(QueryRequest query,
* @return a {@link CompletableFuture} of {@link Void} to react when all query handlers have been unsubscribed
*/
CompletableFuture<Void> prepareDisconnect();

/**
* Registers listener that is going to be invoked when query with given {@code queryId} is completed. Once the query
* is completed, listener will be removed.
*
* @param queryId identifier of the query we are interested when is completed
* @param listener to be invoked when the query is completed
*/
Registration registerQueryCompleteListener(String queryId, Runnable listener);
smcvb marked this conversation as resolved.
Show resolved Hide resolved

/**
* Registers consumer to accepts number of permits to be invoked when the query {@code queryId} consumer requests more updates.
* Once the query is completed, listener will be removed.
*
* @param queryId identifier of the query we are interested when is completed
* @param consumer that accepts number of permits to be invoked when the query consumer requests more updates
smcvb marked this conversation as resolved.
Show resolved Hide resolved
*/
Registration registerQueryFlowControlListener(String queryId, Consumer<Long> consumer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ public QueryChannelImpl(ClientIdentification clientIdentification,
instructionHandlers.put(SubscriptionQueryRequest.RequestCase.GET_INITIAL_RESULT, this::getInitialResult);
instructionHandlers.put(SubscriptionQueryRequest.RequestCase.SUBSCRIBE, this::subscribeToQueryUpdates);
instructionHandlers.put(SubscriptionQueryRequest.RequestCase.UNSUBSCRIBE, this::unsubscribeToQueryUpdates);
instructionHandlers.put(QueryProviderInbound.RequestCase.QUERY_COMPLETE, this::completeStreamingQuery);
instructionHandlers.put(QueryProviderInbound.RequestCase.QUERY_FLOW_CONTROLL, this::flowControlQuery);
queryServiceStub = QueryServiceGrpc.newStub(channel);
}

Expand Down Expand Up @@ -191,6 +193,22 @@ public void complete() {
result.complete();
}

private void completeStreamingQuery(QueryProviderInbound complete, ReplyChannel<QueryProviderOutbound> result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line with comment below suggestion: handleCancelRequest

queryQueryFlowControlListener.remove(complete.getQueryComplete().getRequestId());
Runnable listener = queryCompleteListeners.remove(complete.getQueryComplete().getRequestId());
if (listener != null) {
listener.run();
}
result.complete();
}

private void flowControlQuery(QueryProviderInbound flowControl, ReplyChannel<QueryProviderOutbound> queryProviderOutboundReplyChannel) {
queryQueryFlowControlListener.getOrDefault(flowControl.getQueryFlowControll().getRequestId(),
l -> logger.debug("Received flow control message for unknown request id {}", flowControl.getQueryFlowControll().getRequestId()))
.accept(flowControl.getQueryFlowControll().getPermits());
queryProviderOutboundReplyChannel.complete();
}

@Override
public void connect() {
if (!queryHandlers.isEmpty()) {
Expand Down Expand Up @@ -339,7 +357,7 @@ protected QueryResponse terminalMessage() {

@Override
public void close() {
// this is a one-way stream. No need to close it.
outboundStream().cancel("Client cancelled the stream.", null);
}
};
if ("".equals(query.getMessageIdentifier())) {
Expand Down Expand Up @@ -437,6 +455,30 @@ public CompletableFuture<Void> prepareDisconnect() {
return future;
}

private final Map<String, Runnable> queryCompleteListeners = new ConcurrentHashMap<>();
smcvb marked this conversation as resolved.
Show resolved Hide resolved
private final Map<String, Consumer<Long>> queryQueryFlowControlListener = new ConcurrentHashMap<>();


@Override
public Registration registerQueryCompleteListener(String queryId, Runnable listener) {
queryCompleteListeners.put(queryId, listener);

return () -> {
queryCompleteListeners.remove(queryId);
return CompletableFuture.completedFuture(null);
};
}

@Override
public Registration registerQueryFlowControlListener(String queryId, Consumer<Long> consumer) {
queryQueryFlowControlListener.put(queryId, consumer);

return () -> {
queryQueryFlowControlListener.remove(queryId);
return CompletableFuture.completedFuture(null);
};
}

private void cancelAllSubscriptionQueries() {
subscriptionQueries.forEach((k, v) -> subscriptionQueries.remove(k).forEach(Registration::cancel));
}
Expand Down