Skip to content

Commit

Permalink
Merge pull request #182 from AxonIQ/feature/instruction_result_handli…
Browse files Browse the repository at this point in the history
…ng_bc

Change event processor API operations to return a Result
  • Loading branch information
MGathier committed Apr 22, 2022
2 parents 6dea326 + 97694d2 commit 4c3315c
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.axoniq.axonserver.grpc.admin.LeaveReplicationGroup;
import io.axoniq.axonserver.grpc.admin.LoadBalancingStrategy;
import io.axoniq.axonserver.grpc.admin.ReplicationGroupOverview;
import io.axoniq.axonserver.grpc.admin.Result;
import io.axoniq.axonserver.grpc.admin.Token;
import io.axoniq.axonserver.grpc.admin.UpdateContextPropertiesRequest;
import io.axoniq.axonserver.grpc.admin.UserOverview;
Expand Down Expand Up @@ -63,43 +64,50 @@ public interface AdminChannel {

/**
* Request to pause a specific event processor. Returns a {@link CompletableFuture} that completes when the pause
* has been performed
* has been performed. The {@link CompletableFuture} completes with {@link Result} {@code ACCEPTED}, if the client
* application running the event processor is using a version of the connector prior to 4.6.0.
*
* @param eventProcessorName the name of the event processor to pause
* @param tokenStoreIdentifier the token store identifier of the processor to pause
* @return a {@link CompletableFuture} that completes when the pause has been performed
*/
CompletableFuture<Void> pauseEventProcessor(String eventProcessorName, String tokenStoreIdentifier);
CompletableFuture<Result> pauseEventProcessor(String eventProcessorName, String tokenStoreIdentifier);

/**
* Request to start a specific event processor. Returns a {@link CompletableFuture} that completes when the start
* has been performed
* The {@link CompletableFuture} completes with {@link Result} {@code ACCEPTED}, if the client application
* running the event processor is using a version of the connector prior to 4.6.0.
*
* @param eventProcessorName the name of the event processor to start
* @param tokenStoreIdentifier the token store identifier of the processor to start
* @return a {@link CompletableFuture} that completes when the start has been performed
*/
CompletableFuture<Void> startEventProcessor(String eventProcessorName, String tokenStoreIdentifier);
CompletableFuture<Result> startEventProcessor(String eventProcessorName, String tokenStoreIdentifier);

/**
* Request to split the biggest segment of a specific event processor. Returns a {@link CompletableFuture} that
* completes when the split has been performed
* The {@link CompletableFuture} completes with {@link Result} {@code ACCEPTED}, if the client application
* running the event processor is using a version of the connector prior to 4.6.0.
*
* @param eventProcessorName the name of the event processor to split
* @param tokenStoreIdentifier the token store identifier of the processor to split
* @return a {@link CompletableFuture} that completes when the split has been performed
*/
CompletableFuture<Void> splitEventProcessor(String eventProcessorName, String tokenStoreIdentifier);
CompletableFuture<Result> splitEventProcessor(String eventProcessorName, String tokenStoreIdentifier);

/**
* Request to merge the two smallest segments of a specific event processor. Returns a {@link CompletableFuture}
* that completes when the merge has been performed
* The {@link CompletableFuture} completes with {@link Result} {@code ACCEPTED}, if the client application
* running the event processor is using a version of the connector prior to 4.6.0.
*
* @param eventProcessorName the name of the event processor to merge
* @param tokenStoreIdentifier the token store identifier of the processor to merge
* @return a {@link CompletableFuture} that completes when the merge has been performed
*/
CompletableFuture<Void> mergeEventProcessor(String eventProcessorName, String tokenStoreIdentifier);
CompletableFuture<Result> mergeEventProcessor(String eventProcessorName, String tokenStoreIdentifier);

/**
* Request to balance the load for the given {@code eventProcessorName} within the connected client.
Expand Down Expand Up @@ -147,10 +155,10 @@ public interface AdminChannel {
* @return a {@link CompletableFuture} that completes when all the other clients released the segment or disregard the segment for claiming.
* There is no guarantee that the target client has already claimed the segment when the result completes.
*/
CompletableFuture<Void> moveEventProcessorSegment(String eventProcessorName,
String tokenStoreIdentifier,
int segmentId,
String targetClientIdentifier);
CompletableFuture<Result> moveEventProcessorSegment(String eventProcessorName,
String tokenStoreIdentifier,
int segmentId,
String targetClientIdentifier);

/**
* Request to create an Axon Server user, or update it if it's already present. Returns a {@link CompletableFuture}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.axoniq.axonserver.connector.impl.FutureStreamObserver;
import io.axoniq.axonserver.grpc.Component;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.admin.AdminActionResult;
import io.axoniq.axonserver.grpc.admin.ApplicationAdminServiceGrpc;
import io.axoniq.axonserver.grpc.admin.ApplicationId;
import io.axoniq.axonserver.grpc.admin.ApplicationOverview;
Expand All @@ -52,6 +53,7 @@
import io.axoniq.axonserver.grpc.admin.LoadBalancingStrategy;
import io.axoniq.axonserver.grpc.admin.ReplicationGroupAdminServiceGrpc;
import io.axoniq.axonserver.grpc.admin.ReplicationGroupOverview;
import io.axoniq.axonserver.grpc.admin.Result;
import io.axoniq.axonserver.grpc.admin.Token;
import io.axoniq.axonserver.grpc.admin.UpdateContextPropertiesRequest;
import io.axoniq.axonserver.grpc.admin.UserAdminServiceGrpc;
Expand Down Expand Up @@ -136,55 +138,50 @@ protected EventProcessor terminalMessage() {
}

@Override
public CompletableFuture<Void> pauseEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
public CompletableFuture<Result> pauseEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
EventProcessorIdentifier eventProcessorIdentifier = eventProcessorId(eventProcessorName, tokenStoreIdentifier);
FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<>(null);
FutureStreamObserver<AdminActionResult> responseObserver = new FutureStreamObserver<>(null);
eventProcessorServiceStub.pauseEventProcessor(eventProcessorIdentifier, responseObserver);
return responseObserver.thenAccept(empty -> {
});
return responseObserver.thenApply(AdminActionResult::getResult);
}

@Override
public CompletableFuture<Void> startEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
public CompletableFuture<Result> startEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
EventProcessorIdentifier eventProcessorIdentifier = eventProcessorId(eventProcessorName, tokenStoreIdentifier);
FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<>(null);
FutureStreamObserver<AdminActionResult> responseObserver = new FutureStreamObserver<>(null);
eventProcessorServiceStub.startEventProcessor(eventProcessorIdentifier, responseObserver);
return responseObserver.thenAccept(empty -> {
});
return responseObserver.thenApply(AdminActionResult::getResult);
}


@Override
public CompletableFuture<Void> splitEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
public CompletableFuture<Result> splitEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
EventProcessorIdentifier eventProcessorIdentifier = eventProcessorId(eventProcessorName, tokenStoreIdentifier);
FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<>(null);
FutureStreamObserver<AdminActionResult> responseObserver = new FutureStreamObserver<>(null);
eventProcessorServiceStub.splitEventProcessor(eventProcessorIdentifier, responseObserver);
return responseObserver.thenAccept(empty -> {
});
return responseObserver.thenApply(AdminActionResult::getResult);
}

@Override
public CompletableFuture<Void> mergeEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
public CompletableFuture<Result> mergeEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
EventProcessorIdentifier eventProcessorIdentifier = eventProcessorId(eventProcessorName, tokenStoreIdentifier);
FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<>(null);
FutureStreamObserver<AdminActionResult> responseObserver = new FutureStreamObserver<>(null);
eventProcessorServiceStub.mergeEventProcessor(eventProcessorIdentifier, responseObserver);
return responseObserver.thenRun(() -> {
});
return responseObserver.thenApply(AdminActionResult::getResult);
}

@Override
public CompletableFuture<Void> moveEventProcessorSegment(String eventProcessorName, String tokenStoreIdentifier,
int segmentId, String targetClientIdentifier) {
public CompletableFuture<Result> moveEventProcessorSegment(String eventProcessorName, String tokenStoreIdentifier,
int segmentId, String targetClientIdentifier) {
EventProcessorIdentifier eventProcessorIdentifier = eventProcessorId(eventProcessorName, tokenStoreIdentifier);
FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<>(null);
FutureStreamObserver<AdminActionResult> responseObserver = new FutureStreamObserver<>(null);
MoveSegment request = MoveSegment.newBuilder()
.setEventProcessor(eventProcessorIdentifier)
.setSegment(segmentId)
.setTargetClientId(targetClientIdentifier)
.build();
eventProcessorServiceStub.moveEventProcessorSegment(request, responseObserver);
return responseObserver.thenRun(() -> {
});
return responseObserver.thenApply(AdminActionResult::getResult);
}

@Override
Expand Down

0 comments on commit 4c3315c

Please sign in to comment.