Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming query support #112

Merged
merged 21 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0b6c917
Added a way to register a listener when query completes.
m1l4n54v1c Oct 28, 2021
0372f2c
Add flow control messages and listeners for query channel
schananas Nov 2, 2021
14d1c8e
Fix peek() in FlowControlledBuffer to work as described in doc
schananas Nov 4, 2021
1fbe25d
add logger for unknown requests
schananas Nov 9, 2021
a1a6049
fix javadoc and todo remove
schananas Nov 9, 2021
2231e19
fix javadoc and todo remove
schananas Nov 9, 2021
9e3cacd
add registration for query complete and query flow control
schananas Nov 10, 2021
1c082cd
Fixed Query Flow Control typo.
m1l4n54v1c Nov 24, 2021
4509600
QueryHandler now returns a handle for FlowControl.
m1l4n54v1c Dec 9, 2021
c412f06
Code review remarks.
m1l4n54v1c Dec 13, 2021
21a54a8
Added test for MultiFlowControl with a single handler.
m1l4n54v1c Dec 14, 2021
487c650
Added copyright to new files.
m1l4n54v1c Dec 15, 2021
4151402
Added buffering flow control capabilities to query functionality.
m1l4n54v1c Jan 26, 2022
bbd632b
Merge branch 'feature/adminApi/getEventProcessors' into streaming-query
m1l4n54v1c Feb 2, 2022
31d060e
Merge branch 'feature/adminApi/getEventProcessors' into streaming-query
m1l4n54v1c Feb 2, 2022
55c7f40
Merge branch 'master' into streaming-query
m1l4n54v1c Feb 3, 2022
5ce2db7
code review: API changes
m1l4n54v1c Mar 1, 2022
d412ba5
Added @since.
m1l4n54v1c Mar 4, 2022
2a23d5b
Review comments.
m1l4n54v1c Mar 21, 2022
1f91110
Sonar comments.
m1l4n54v1c Mar 21, 2022
afdfd2e
Adepted to the latest API changes.
m1l4n54v1c Mar 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<netty.tcnative.version>2.0.34.Final</netty.tcnative.version>
<junit.jupiter.version>5.8.1</junit.jupiter.version>

<axonserver.api.version>4.5</axonserver.api.version>
<axonserver.api.version>4.6.0-SNAPSHOT</axonserver.api.version>

<jacoco-maven-plugin.version>0.8.7</jacoco-maven-plugin.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected T take() throws InterruptedException {
* @return he first entry of this buffer without removing it, or {@code null} if it is empty
*/
protected T peek() {
return validate(buffer.peek(), false);
return validate(buffer.peek(), true);
smcvb marked this conversation as resolved.
Show resolved Hide resolved
}

private T validate(T peek, boolean nullOnTerminal) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.connector.query.impl.NoopFlowControl;
import io.axoniq.axonserver.grpc.query.QueryRequest;
import io.axoniq.axonserver.grpc.query.QueryResponse;
import io.axoniq.axonserver.grpc.query.QueryUpdate;
Expand All @@ -40,6 +41,22 @@ public interface QueryHandler {
*/
void handle(QueryRequest query, ReplyChannel<QueryResponse> responseHandler);

/**
* Handle the given {@code query}, using given {@code responseHandler} to send the response(s). It is flow control
* aware - messages should be sent via {@code responseHandler} when requested.
* <p>
* Note that the query <em>must</em> be completed using {@link ReplyChannel#complete()} or {@link
* ReplyChannel#sendLast(Object)}.
smcvb marked this conversation as resolved.
Show resolved Hide resolved
*
* @param query the message representing the query request
* @param responseHandler the handler to send responses with
* @return a {@link FlowControl} to request more responses and also to cancel sending responses
smcvb marked this conversation as resolved.
Show resolved Hide resolved
*/
default FlowControl stream(QueryRequest query, ReplyChannel<QueryResponse> responseHandler) {
handle(query, responseHandler);
return NoopFlowControl.INSTANCE;
}

/**
* Registers an incoming subscription query request, represented by given {@code query}, using given {@code
* updateHandler} to send updates when the projection for this query changes.
Expand All @@ -56,6 +73,24 @@ default Registration registerSubscriptionQuery(SubscriptionQuery query, UpdateHa
return null;
}

/**
* Flow control for queries.
*/
interface FlowControl {

/**
* Requests {@code requested} amount of responses to be sent.
*
* @param requested number of responses to be sent
*/
void request(long requested);

/**
* Completes response sending.
*/
void complete();
smcvb marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Interface describing a stream of updates to a subscription query.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.axoniq.axonserver.connector.query.impl;

import io.axoniq.axonserver.connector.Registration;
import io.axoniq.axonserver.connector.query.QueryHandler;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* An implementation of {@link QueryHandler.FlowControl} that delegates operations to several registered {@link
* QueryHandler.FlowControl} implementations.
*/
public class MultiFlowControl implements QueryHandler.FlowControl {

private final List<QueryHandler.FlowControl> delegates = new CopyOnWriteArrayList<>();

/**
* Adds given {@code flowControl} to the list of delegates to be invoked once the {@link QueryHandler.FlowControl}
* operations is triggered.
*
* @param flowControl the {@link QueryHandler.FlowControl} implementation to be registered as a delegate
* @return a registration which can be used to remove this {@link QueryHandler.FlowControl} from the list of
* delegates
*/
public Registration add(QueryHandler.FlowControl flowControl) {
delegates.add(flowControl);
return () -> CompletableFuture.runAsync(() -> delegates.remove(flowControl));
}

@Override
public void request(long requested) {
delegates.parallelStream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this cause a "request 1" to lead to more than 1 message being sent? Is this acceptable or should we buffer the responses and deplete the buffer before asking more from upstream?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you are right! We could also do a round-robin without the buffer - no necessary invocations. What do you think?

.forEach(flowControl -> flowControl.request(requested));
}

@Override
public void complete() {
delegates.parallelStream()
.forEach(QueryHandler.FlowControl::complete);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.axoniq.axonserver.connector.query.impl;

import io.axoniq.axonserver.connector.query.QueryHandler;

/**
* NOOP implementation of {@link QueryHandler.FlowControl}.
*/
public enum NoopFlowControl implements QueryHandler.FlowControl {

/**
* Singleton instance of {@link NoopFlowControl}.
*/
INSTANCE;

@Override
public void request(long requested) {
// noop
}

@Override
public void complete() {
// noop
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class QueryChannelImpl extends AbstractAxonServerChannel<QueryProviderOut
private final Object queryHandlerMonitor = new Object();
private final Map<String, Set<Registration>> subscriptionQueries = new ConcurrentHashMap<>();
private final QueryServiceGrpc.QueryServiceStub queryServiceStub;
private final Set<CompletableFuture<?>> queriesInProgress = ConcurrentHashMap.newKeySet();
private final Map<String, QueryInProgress> queriesInProgress = new ConcurrentHashMap<>();
private final AtomicBoolean subscriptionsCompleted = new AtomicBoolean(false);

/**
Expand Down Expand Up @@ -122,6 +122,8 @@ public QueryChannelImpl(ClientIdentification clientIdentification,
instructionHandlers.put(SubscriptionQueryRequest.RequestCase.GET_INITIAL_RESULT, this::getInitialResult);
instructionHandlers.put(SubscriptionQueryRequest.RequestCase.SUBSCRIBE, this::subscribeToQueryUpdates);
instructionHandlers.put(SubscriptionQueryRequest.RequestCase.UNSUBSCRIBE, this::unsubscribeToQueryUpdates);
instructionHandlers.put(QueryProviderInbound.RequestCase.QUERY_COMPLETE, this::completeStreamingQuery);
instructionHandlers.put(QueryProviderInbound.RequestCase.QUERY_FLOW_CONTROL, this::flowControlQuery);
queryServiceStub = QueryServiceGrpc.newStub(channel);
}

Expand Down Expand Up @@ -191,6 +193,18 @@ public void complete() {
result.complete();
}

private void completeStreamingQuery(QueryProviderInbound complete, ReplyChannel<QueryProviderOutbound> result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In line with comment below suggestion: handleCancelRequest

queriesInProgress.getOrDefault(complete.getQueryComplete().getRequestId(), QueryInProgress.noop())
.complete();
result.complete();
}

private void flowControlQuery(QueryProviderInbound flowControl, ReplyChannel<QueryProviderOutbound> result) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid confusion, I suggest renaming. For example: handleFlowControlRequest

queriesInProgress.getOrDefault(flowControl.getQueryFlowControl().getRequestId(), QueryInProgress.noop())
.request(flowControl.getQueryFlowControl().getPermits());
result.complete();
}

@Override
public void connect() {
if (!queryHandlers.isEmpty()) {
Expand Down Expand Up @@ -339,7 +353,7 @@ protected QueryResponse terminalMessage() {

@Override
public void close() {
// this is a one-way stream. No need to close it.
outboundStream().cancel("Client cancelled the stream.", null);
}
};
if ("".equals(query.getMessageIdentifier())) {
Expand Down Expand Up @@ -410,7 +424,9 @@ public synchronized void disconnect() {
if (!queriesInProgress.isEmpty()) {
logger.info("Disconnect requested. Waiting for {} queries to be completed", queriesInProgress.size());
}
return CompletableFuture.allOf(queriesInProgress.stream()
return CompletableFuture.allOf(queriesInProgress.values()
smcvb marked this conversation as resolved.
Show resolved Hide resolved
.stream()
.map(QueryInProgress::completeAsync)
.reduce(CompletableFuture::allOf)
.orElseGet(() -> CompletableFuture.completedFuture(null)));
}).thenAccept(previousStream -> doIfNotNull(previousOutbound, StreamObserver::onCompleted));
Expand Down Expand Up @@ -451,10 +467,12 @@ private void doHandleQuery(QueryProviderInbound query, ReplyChannel<QueryRespons
}

private void doHandleQuery(QueryRequest query, ReplyChannel<QueryResponse> responseChannel) {
CompletableFuture<?> completionHandle = new CompletableFuture<>();
queriesInProgress.add(completionHandle);
completionHandle.whenComplete((r, e) -> queriesInProgress.remove(completionHandle));
ReplyChannel<QueryResponse> responseHandler = new CloseAwareReplyChannelAdapter(responseChannel, () -> completionHandle.complete(null));
MultiFlowControl multiFlowControl = new MultiFlowControl();
Runnable removeQuery = () -> queriesInProgress.remove(query.getMessageIdentifier());
QueryInProgress queryInProgress = new QueryInProgress(removeQuery, multiFlowControl);
queriesInProgress.putIfAbsent(query.getMessageIdentifier(), queryInProgress);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of putIfAbsent suggests that the given identifier may already exist. In that case, wouldn't we want to use the already existing QueryInProgess instance? Of maybe reject the request altogether?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Semantically, I think we want to reject the request. If there is already the same query in progress, responses sent are valid, and responses to be sent are going to be valid. If you want a new result stream, issue a new query. If, however, receiving the same query twice is a result of a retry, rejecting it is also fine.

ReplyChannel<QueryResponse> responseHandler = new CloseAwareReplyChannelAdapter(responseChannel,
queryInProgress::complete);
Set<QueryHandler> handlers = queryHandlers.getOrDefault(query.getQuery(), Collections.emptySet());
if (handlers.isEmpty()) {
responseHandler.sendNack();
Expand All @@ -470,47 +488,50 @@ private void doHandleQuery(QueryRequest query, ReplyChannel<QueryResponse> respo
responseHandler.sendAck();

AtomicInteger completeCounter = new AtomicInteger(handlers.size());
handlers.forEach(queryHandler -> queryHandler.handle(query, new ReplyChannel<QueryResponse>() {
@Override
public void send(QueryResponse response) {
if (!query.getMessageIdentifier().equals(response.getRequestIdentifier())) {
logger.debug("RequestIdentifier not properly set, modifying message");
QueryResponse newResponse = response.toBuilder()
.setRequestIdentifier(query.getMessageIdentifier())
.build();
responseHandler.send(newResponse);
} else {
responseHandler.send(response);
handlers.forEach(queryHandler -> {
ReplyChannel<QueryResponse> replyChannel = new ReplyChannel<QueryResponse>() {
@Override
public void send(QueryResponse response) {
if (!query.getMessageIdentifier().equals(response.getRequestIdentifier())) {
logger.debug("RequestIdentifier not properly set, modifying message");
QueryResponse newResponse = response.toBuilder()
.setRequestIdentifier(query.getMessageIdentifier())
.build();
responseHandler.send(newResponse);
} else {
responseHandler.send(response);
}
}
}

@Override
public void complete() {
if (completeCounter.decrementAndGet() == 0) {
responseHandler.complete();
@Override
public void complete() {
if (completeCounter.decrementAndGet() == 0) {
responseHandler.complete();
}
}
}

@Override
public void completeWithError(ErrorMessage errorMessage) {
responseHandler.completeWithError(errorMessage);
}
@Override
public void completeWithError(ErrorMessage errorMessage) {
responseHandler.completeWithError(errorMessage);
}

@Override
public void completeWithError(ErrorCategory errorCategory, String message) {
responseHandler.completeWithError(errorCategory, message);
}
@Override
public void completeWithError(ErrorCategory errorCategory, String message) {
responseHandler.completeWithError(errorCategory, message);
}

@Override
public void sendNack(ErrorMessage errorMessage) {
responseHandler.sendNack(errorMessage);
}
@Override
public void sendNack(ErrorMessage errorMessage) {
responseHandler.sendNack(errorMessage);
}

@Override
public void sendAck() {
responseHandler.sendAck();
}
}));
@Override
public void sendAck() {
responseHandler.sendAck();
}
};
multiFlowControl.add(queryHandler.stream(query, replyChannel));
});
}

private void handleQuery(QueryProviderInbound inbound, ReplyChannel<QueryProviderOutbound> result) {
Expand Down Expand Up @@ -601,6 +622,37 @@ public void sendAck() {
);
}

private static class QueryInProgress {

private final CompletableFuture<?> closeHandler;
private final QueryHandler.FlowControl flowControl;

public static QueryInProgress noop() {
return new QueryInProgress(() -> { }, NoopFlowControl.INSTANCE);
}

public QueryInProgress(Runnable closeHandler, QueryHandler.FlowControl flowControl) {
this.closeHandler = new CompletableFuture<>();
this.flowControl = flowControl;
this.closeHandler.whenComplete((r, e) -> {
flowControl.complete();
closeHandler.run();
});
}

public CompletableFuture<?> completeAsync() {
return closeHandler;
smcvb marked this conversation as resolved.
Show resolved Hide resolved
}

public void complete() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest: rename to cancel()

Instead of completing normally, perhaps complete with a QueryCancelledException instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure whether we want to complete exceptionally here. Having a receiver cancelling the stream doesn't seem like an exceptional case but rather a regular one.

completeAsync().complete(null);
}

public void request(long requested) {
flowControl.request(requested);
}
}

private static class CloseAwareReplyChannelAdapter implements ReplyChannel<QueryResponse> {
private final ReplyChannel<QueryResponse> delegate;
private final Runnable onClose;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,8 @@ public Registration registerSubscriptionQuery(SubscriptionQuery query, UpdateHan
void testReconnectFinishesQueriesInTransit() throws InterruptedException {
Queue<ReplyChannel<QueryResponse>> queriesInProgress = new ConcurrentLinkedQueue<>();
QueryChannel queryChannel = connection1.queryChannel();
queryChannel.registerQueryHandler((command, reply) -> {
CompletableFuture<QueryResponse> result = new CompletableFuture<>();
queriesInProgress.add(reply);
}, new QueryDefinition("testQuery", String.class));
queryChannel.registerQueryHandler((command, reply) -> queriesInProgress.add(reply),
new QueryDefinition("testQuery", String.class));

QueryChannel queryChannel2 = connection2.queryChannel();

Expand Down