Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change event processor API operations to return a Result #182

Merged
merged 3 commits into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.axoniq.axonserver.grpc.admin.LeaveReplicationGroup;
import io.axoniq.axonserver.grpc.admin.LoadBalancingStrategy;
import io.axoniq.axonserver.grpc.admin.ReplicationGroupOverview;
import io.axoniq.axonserver.grpc.admin.Result;
import io.axoniq.axonserver.grpc.admin.Token;
import io.axoniq.axonserver.grpc.admin.UpdateContextPropertiesRequest;
import io.axoniq.axonserver.grpc.admin.UserOverview;
Expand Down Expand Up @@ -63,43 +64,50 @@ public interface AdminChannel {

/**
* Request to pause a specific event processor. Returns a {@link CompletableFuture} that completes when the pause
* has been performed
* has been performed. The {@link CompletableFuture} completes with {@link Result} {@code ACCEPTED}, if the client
* application running the event processor is using a version of the connector prior to 4.6.0.
*
* @param eventProcessorName the name of the event processor to pause
* @param tokenStoreIdentifier the token store identifier of the processor to pause
* @return a {@link CompletableFuture} that completes when the pause has been performed
*/
CompletableFuture<Void> pauseEventProcessor(String eventProcessorName, String tokenStoreIdentifier);
CompletableFuture<Result> pauseEventProcessor(String eventProcessorName, String tokenStoreIdentifier);

/**
* Request to start a specific event processor. Returns a {@link CompletableFuture} that completes when the start
* has been performed
* The {@link CompletableFuture} completes with {@link Result} {@code ACCEPTED}, if the client application
* running the event processor is using a version of the connector prior to 4.6.0.
*
* @param eventProcessorName the name of the event processor to start
* @param tokenStoreIdentifier the token store identifier of the processor to start
* @return a {@link CompletableFuture} that completes when the start has been performed
*/
CompletableFuture<Void> startEventProcessor(String eventProcessorName, String tokenStoreIdentifier);
CompletableFuture<Result> startEventProcessor(String eventProcessorName, String tokenStoreIdentifier);

/**
* Request to split the biggest segment of a specific event processor. Returns a {@link CompletableFuture} that
* completes when the split has been performed
* The {@link CompletableFuture} completes with {@link Result} {@code ACCEPTED}, if the client application
* running the event processor is using a version of the connector prior to 4.6.0.
*
* @param eventProcessorName the name of the event processor to split
* @param tokenStoreIdentifier the token store identifier of the processor to split
* @return a {@link CompletableFuture} that completes when the split has been performed
*/
CompletableFuture<Void> splitEventProcessor(String eventProcessorName, String tokenStoreIdentifier);
CompletableFuture<Result> splitEventProcessor(String eventProcessorName, String tokenStoreIdentifier);

/**
* Request to merge the two smallest segments of a specific event processor. Returns a {@link CompletableFuture}
* that completes when the merge has been performed
* The {@link CompletableFuture} completes with {@link Result} {@code ACCEPTED}, if the client application
* running the event processor is using a version of the connector prior to 4.6.0.
*
* @param eventProcessorName the name of the event processor to merge
* @param tokenStoreIdentifier the token store identifier of the processor to merge
* @return a {@link CompletableFuture} that completes when the merge has been performed
*/
CompletableFuture<Void> mergeEventProcessor(String eventProcessorName, String tokenStoreIdentifier);
CompletableFuture<Result> mergeEventProcessor(String eventProcessorName, String tokenStoreIdentifier);

/**
* Request to balance the load for the given {@code eventProcessorName} within the connected client.
Expand Down Expand Up @@ -147,10 +155,10 @@ public interface AdminChannel {
* @return a {@link CompletableFuture} that completes when all the other clients released the segment or disregard the segment for claiming.
* There is no guarantee that the target client has already claimed the segment when the result completes.
*/
CompletableFuture<Void> moveEventProcessorSegment(String eventProcessorName,
String tokenStoreIdentifier,
int segmentId,
String targetClientIdentifier);
CompletableFuture<Result> moveEventProcessorSegment(String eventProcessorName,
String tokenStoreIdentifier,
int segmentId,
String targetClientIdentifier);

/**
* Request to create an Axon Server user, or update it if it's already present. Returns a {@link CompletableFuture}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.axoniq.axonserver.connector.impl.FutureStreamObserver;
import io.axoniq.axonserver.grpc.Component;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.admin.AdminActionResult;
import io.axoniq.axonserver.grpc.admin.ApplicationAdminServiceGrpc;
import io.axoniq.axonserver.grpc.admin.ApplicationId;
import io.axoniq.axonserver.grpc.admin.ApplicationOverview;
Expand All @@ -52,6 +53,7 @@
import io.axoniq.axonserver.grpc.admin.LoadBalancingStrategy;
import io.axoniq.axonserver.grpc.admin.ReplicationGroupAdminServiceGrpc;
import io.axoniq.axonserver.grpc.admin.ReplicationGroupOverview;
import io.axoniq.axonserver.grpc.admin.Result;
import io.axoniq.axonserver.grpc.admin.Token;
import io.axoniq.axonserver.grpc.admin.UpdateContextPropertiesRequest;
import io.axoniq.axonserver.grpc.admin.UserAdminServiceGrpc;
Expand Down Expand Up @@ -136,55 +138,50 @@ protected EventProcessor terminalMessage() {
}

@Override
public CompletableFuture<Void> pauseEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
public CompletableFuture<Result> pauseEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
EventProcessorIdentifier eventProcessorIdentifier = eventProcessorId(eventProcessorName, tokenStoreIdentifier);
FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<>(null);
FutureStreamObserver<AdminActionResult> responseObserver = new FutureStreamObserver<>(null);
eventProcessorServiceStub.pauseEventProcessor(eventProcessorIdentifier, responseObserver);
return responseObserver.thenAccept(empty -> {
});
return responseObserver.thenApply(AdminActionResult::getResult);
}

@Override
public CompletableFuture<Void> startEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
public CompletableFuture<Result> startEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
EventProcessorIdentifier eventProcessorIdentifier = eventProcessorId(eventProcessorName, tokenStoreIdentifier);
FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<>(null);
FutureStreamObserver<AdminActionResult> responseObserver = new FutureStreamObserver<>(null);
eventProcessorServiceStub.startEventProcessor(eventProcessorIdentifier, responseObserver);
return responseObserver.thenAccept(empty -> {
});
return responseObserver.thenApply(AdminActionResult::getResult);
}


@Override
public CompletableFuture<Void> splitEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
public CompletableFuture<Result> splitEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
EventProcessorIdentifier eventProcessorIdentifier = eventProcessorId(eventProcessorName, tokenStoreIdentifier);
FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<>(null);
FutureStreamObserver<AdminActionResult> responseObserver = new FutureStreamObserver<>(null);
eventProcessorServiceStub.splitEventProcessor(eventProcessorIdentifier, responseObserver);
return responseObserver.thenAccept(empty -> {
});
return responseObserver.thenApply(AdminActionResult::getResult);
}

@Override
public CompletableFuture<Void> mergeEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
public CompletableFuture<Result> mergeEventProcessor(String eventProcessorName, String tokenStoreIdentifier) {
EventProcessorIdentifier eventProcessorIdentifier = eventProcessorId(eventProcessorName, tokenStoreIdentifier);
FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<>(null);
FutureStreamObserver<AdminActionResult> responseObserver = new FutureStreamObserver<>(null);
eventProcessorServiceStub.mergeEventProcessor(eventProcessorIdentifier, responseObserver);
return responseObserver.thenRun(() -> {
});
return responseObserver.thenApply(AdminActionResult::getResult);
}

@Override
public CompletableFuture<Void> moveEventProcessorSegment(String eventProcessorName, String tokenStoreIdentifier,
int segmentId, String targetClientIdentifier) {
public CompletableFuture<Result> moveEventProcessorSegment(String eventProcessorName, String tokenStoreIdentifier,
int segmentId, String targetClientIdentifier) {
EventProcessorIdentifier eventProcessorIdentifier = eventProcessorId(eventProcessorName, tokenStoreIdentifier);
FutureStreamObserver<Empty> responseObserver = new FutureStreamObserver<>(null);
FutureStreamObserver<AdminActionResult> responseObserver = new FutureStreamObserver<>(null);
MoveSegment request = MoveSegment.newBuilder()
.setEventProcessor(eventProcessorIdentifier)
.setSegment(segmentId)
.setTargetClientId(targetClientIdentifier)
.build();
eventProcessorServiceStub.moveEventProcessorSegment(request, responseObserver);
return responseObserver.thenRun(() -> {
});
return responseObserver.thenApply(AdminActionResult::getResult);
}

@Override
Expand Down