New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming query support #112
Conversation
Integration tests to be done |
src/main/java/io/axoniq/axonserver/connector/query/QueryHandler.java
Outdated
Show resolved
Hide resolved
|
||
@Override | ||
public void request(long requested) { | ||
delegates.parallelStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this cause a "request 1" to lead to more than 1 message being sent? Is this acceptable or should we buffer the responses and deplete the buffer before asking more from upstream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you are right! We could also do a round-robin without the buffer - no necessary invocations. What do you think?
result.complete(); | ||
} | ||
|
||
private void flowControlQuery(QueryProviderInbound flowControl, ReplyChannel<QueryProviderOutbound> result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to avoid confusion, I suggest renaming. For example: handleFlowControlRequest
@@ -191,6 +193,18 @@ public void complete() { | |||
result.complete(); | |||
} | |||
|
|||
private void completeStreamingQuery(QueryProviderInbound complete, ReplyChannel<QueryProviderOutbound> result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In line with comment below suggestion: handleCancelRequest
src/main/java/io/axoniq/axonserver/connector/query/impl/QueryChannelImpl.java
Outdated
Show resolved
Hide resolved
MultiFlowControl multiFlowControl = new MultiFlowControl(); | ||
Runnable removeQuery = () -> queriesInProgress.remove(query.getMessageIdentifier()); | ||
QueryInProgress queryInProgress = new QueryInProgress(removeQuery, multiFlowControl); | ||
queriesInProgress.putIfAbsent(query.getMessageIdentifier(), queryInProgress); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of putIfAbsent
suggests that the given identifier may already exist. In that case, wouldn't we want to use the already existing QueryInProgess
instance? Of maybe reject the request altogether?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Semantically, I think we want to reject the request. If there is already the same query in progress, responses sent are valid, and responses to be sent are going to be valid. If you want a new result stream, issue a new query. If, however, receiving the same query twice is a result of a retry, rejecting it is also fine.
return closeHandler; | ||
} | ||
|
||
public void complete() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest: rename to cancel()
Instead of completing normally, perhaps complete with a QueryCancelledException
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure whether we want to complete exceptionally here. Having a receiver cancelling the stream doesn't seem like an exceptional case but rather a regular one.
src/main/java/io/axoniq/axonserver/connector/query/impl/QueryChannelImpl.java
Outdated
Show resolved
Hide resolved
Kudos, SonarCloud Quality Gate passed! |
# Conflicts: # pom.xml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementation wise I approve this PR, as I assume is known through the virtual reviews we've done. I've got plenty of JavaDoc suggestions as always, however.
src/main/java/io/axoniq/axonserver/connector/impl/FlowControlledBuffer.java
Show resolved
Hide resolved
src/main/java/io/axoniq/axonserver/connector/query/QueryChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/io/axoniq/axonserver/connector/query/QueryChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/io/axoniq/axonserver/connector/query/impl/QueryChannelImpl.java
Outdated
Show resolved
Hide resolved
...a/io/axoniq/axonserver/connector/impl/buffer/FlowControlledDisposableReadonlyBufferTest.java
Show resolved
Hide resolved
...a/io/axoniq/axonserver/connector/impl/buffer/FlowControlledDisposableReadonlyBufferTest.java
Outdated
Show resolved
Hide resolved
src/test/java/io/axoniq/axonserver/connector/impl/buffer/RoundRobinMultiReadonlyBufferTest.java
Outdated
Show resolved
Hide resolved
src/test/java/io/axoniq/axonserver/connector/impl/buffer/RoundRobinMultiReadonlyBufferTest.java
Show resolved
Hide resolved
src/main/java/io/axoniq/axonserver/connector/query/impl/QueryChannelImpl.java
Outdated
Show resolved
Hide resolved
Also, don't forget to take a look at Sonar. It marks one bug and four code smells. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concerns have been addressed, hence approving.
Kudos, SonarCloud Quality Gate passed! |
Adds few adjustemnts and fixes for better streaming query support