Skip to content

Commit

Permalink
Merge pull request #2001 from AxonFramework/streaming-query
Browse files Browse the repository at this point in the history
Streaming query
  • Loading branch information
m1l4n54v1c committed Mar 24, 2022
2 parents 8ea8532 + 279a585 commit 2c73d33
Show file tree
Hide file tree
Showing 33 changed files with 3,257 additions and 203 deletions.
12 changes: 12 additions & 0 deletions axon-server-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,18 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector;

/**
* A {@link Runable} with a priority.
*
* @author Stefan Dragisic
* @author Milan Savic
* @since 4.6.0
*/
public interface PrioritizedRunnable extends Runnable {

/**
* The priority of this runnable.
*
* @return the priority of this runnable.
*/
long priority();
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2010-2022. Axon Framework
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.axonframework.axonserver.connector.query;

import io.axoniq.axonserver.grpc.query.QueryResponse;
import org.axonframework.axonserver.connector.ErrorCode;
import org.axonframework.axonserver.connector.PrioritizedRunnable;
import org.axonframework.common.AxonException;
import org.axonframework.messaging.responsetypes.ResponseType;
import org.axonframework.queryhandling.QueryResponseMessage;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.CompletableFuture;

/**
* Task that processes query responses. It will block until all responses are received.
*
* @param <R> the type of expected final response
* @author Allard Buijze
* @author Steven van Beelen
* @author Milan Savic
* @author Stefan Dragisic
* @since 4.6.0
*/
class BlockingQueryResponseProcessingTask<R> implements PrioritizedRunnable {

private final Publisher<QueryResponse> result;
private final QuerySerializer serializer;
private final CompletableFuture<QueryResponseMessage<R>> queryTransaction;
private final long priority;
private final ResponseType<R> expectedResponseType;

public BlockingQueryResponseProcessingTask(Publisher<QueryResponse> result,
QuerySerializer serializer,
CompletableFuture<QueryResponseMessage<R>> queryTransaction,
long priority,
ResponseType<R> expectedResponseType) {
this.result = result;
this.serializer = serializer;
this.queryTransaction = queryTransaction;
this.priority = priority;
this.expectedResponseType = expectedResponseType;
}

@Override
public long priority() {
return priority;
}

@Override
public void run() {
result.subscribe(new Subscriber<QueryResponse>() {
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(QueryResponse queryResponse) {
queryTransaction.complete(serializer.deserializeResponse(queryResponse, expectedResponseType));
}

@Override
public void onError(Throwable t) {
AxonException exception = ErrorCode.QUERY_DISPATCH_ERROR.convert(t);
queryTransaction.completeExceptionally(exception);
}

@Override
public void onComplete() {
// noop
}
});
}
}

0 comments on commit 2c73d33

Please sign in to comment.