Skip to content

Commit

Permalink
Merge pull request #112 from AxonIQ/streaming-query
Browse files Browse the repository at this point in the history
Streaming query support
  • Loading branch information
m1l4n54v1c committed Mar 24, 2022
2 parents 0d89820 + afdfd2e commit 731f853
Show file tree
Hide file tree
Showing 22 changed files with 2,288 additions and 79 deletions.
41 changes: 41 additions & 0 deletions src/main/java/io/axoniq/axonserver/connector/FlowControl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2020-2022. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonserver.connector;

/**
* Controls the flow of the messages via communication channel. Implementations should send messages only when
* requested. Once the cancellation is invoked, implementations should stop sending messages.
*
* @author Milan Savic
* @author Stefan Dragisic
* @author Allard Buijze
* @since 4.6.0
*/
public interface FlowControl {

/**
* Requests the {@code requested} amount of responses to be sent.
*
* @param requested number of responses to be sent
*/
void request(long requested);

/**
* Cancels response sending.
*/
void cancel();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2020-2022. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.ErrorMessage;

/**
* A {@link ReplyChannel} implementation that uses a given {@link CloseableBuffer buffer} to buffer send, complete and
* complete-with-error. {@code ACK}s and {@code NACK}s are delegated via {@code delegate ReplyChannel}.
*
* @param <T> the type of messages this {@link ReplyChannel} deals with
* @author Milan Savic
* @author Stefan Dragisic
* @author Allard Buijze
* @since 4.6.0
*/
public class BufferingReplyChannel<T> implements ReplyChannel<T> {

private final ReplyChannel<T> delegate;
private final CloseableBuffer<T> buffer;

/**
* Instantiates this {@link BufferingReplyChannel} with given {@code delegate} and {@code buffer}.
*
* @param delegate used to delegate {@code ack}s and {@code nack}s
* @param buffer used to buffer sends, completes and completes with error
*/
public BufferingReplyChannel(ReplyChannel<T> delegate, CloseableBuffer<T> buffer) {
this.delegate = delegate;
this.buffer = buffer;
}

@Override
public void send(T outboundMessage) {
buffer.put(outboundMessage);
}

@Override
public void sendAck() {
delegate.sendAck();
}

@Override
public void sendNack(ErrorMessage errorMessage) {
delegate.sendNack(errorMessage);
}

@Override
public void complete() {
buffer.close();
}

@Override
public void completeWithError(ErrorMessage errorMessage) {
buffer.closeExceptionally(errorMessage);
}

@Override
public void completeWithError(ErrorCategory errorCategory, String message) {
ErrorMessage error = ErrorMessage.newBuilder()
.setErrorCode(errorCategory.errorCode())
.setMessage(message)
.build();
buffer.closeExceptionally(error);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2020-2022. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.ErrorMessage;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@link ReplyChannel} implementation that will trigger a given {@link Runnable action} when it is closed.
*
* @param <T> the type of messages flowing through this {@link ReplyChannel}
* @author Milan Savic
* @author Stefan Dragisic
* @author Allard Buijze
* @since 4.6.0
*/
public class CloseAwareReplyChannel<T> implements ReplyChannel<T> {

private final ReplyChannel<T> delegate;
private final Runnable onClose;
private final AtomicBoolean closed = new AtomicBoolean();

/**
* Instantiates this {@link ReplyChannel} with given {@code delegate}.
*
* @param delegate the delegate
*/
public CloseAwareReplyChannel(ReplyChannel<T> delegate) {
this(delegate, () -> {
});
}

/**
* Instantiates this {@link ReplyChannel} with given {@code delegate} and {@code onClose} {@link Runnable} execution
* to be executed once the channel is closed.
*
* @param delegate the delegate
* @param onClose to be executed when channel is closed
*/
public CloseAwareReplyChannel(ReplyChannel<T> delegate, Runnable onClose) {
this.delegate = delegate;
this.onClose = () -> {
closed.set(true);
onClose.run();
};
}

@Override
public void send(T outboundMessage) {
delegate.send(outboundMessage);
}

@Override
public void sendLast(T outboundMessage) {
delegate.sendLast(outboundMessage);
onClose.run();
}

@Override
public void sendAck() {
delegate.sendAck();
}

@Override
public void sendNack() {
delegate.sendNack();
}

@Override
public void sendNack(ErrorMessage errorMessage) {
delegate.sendNack(errorMessage);
}

@Override
public void complete() {
delegate.complete();
onClose.run();
}

@Override
public void completeWithError(ErrorMessage errorMessage) {
delegate.completeWithError(errorMessage);
onClose.run();
}

@Override
public void completeWithError(ErrorCategory errorCategory, String message) {
delegate.completeWithError(errorCategory, message);
onClose.run();
}

/**
* Indicates whether this {@link ReplyChannel} is closed.
*
* @return {@code true} if closed, {@code false} otherwise
*/
public boolean isClosed() {
return closed.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2020-2022. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.grpc.ErrorMessage;

/**
* A {@link CloseableReadonlyBuffer buffer} that can be closed by the producing side.
*
* @param <T> the type of messages in this buffer
* @author Milan Savic
* @author Stefan Dragisic
* @author Allard Buijze
* @since 4.6.0
*/
public interface CloseableBuffer<T> extends CloseableReadonlyBuffer<T> {

/**
* Puts a message in this buffer.
*
* @param message the message to be put in this buffer
*/
void put(T message);

/**
* Closes this buffer from the producing side.
*/
void close();

/**
* Closes this buffer exceptionally from the producing side.
*
* @param errorMessage an error indicating why this buffer is closed
*/
void closeExceptionally(ErrorMessage errorMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2020-2022. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.grpc.ErrorMessage;

import java.util.Optional;

/**
* A readonly buffer that can be closed from the producing side.
*
* @param <T> the type of messages in this buffer
* @author Milan Savic
* @author Stefan Dragisic
* @author Allard Buijze
* @since 4.6.0
*/
public interface CloseableReadonlyBuffer<T> {

/**
* Polls the message from this buffer. If the returned {@link Optional} is empty, that doesn't mean that this buffer
* stays empty forever. The {@link #closed()} method should be consolidated to validate this.
*
* @return an {@link Optional} with polled message
*/
Optional<T> poll();

/**
* Indicates whether there are messages in the buffer. If this returns {@code false}, that doesn't mean that this
* buffer stays empty forever. The {@link #closed()} method should be consolidated to validate this.
*
* @return {@code true} if buffer is empty, {@code false} otherwise
*/
boolean isEmpty();

/**
* Returns the overall capacity of this buffer.
*
* @return the overall capacity of this buffer
*/
int capacity();

/**
* Registers an action to be triggered when there is a new message added to the buffer, or the buffer is closed, or
* the buffer is closed with an error. The action can check {@link #poll()}, {@link #closed()}, and {@link #error()}
* to validate this.
*
* @param onAvailable to be invoked when there are changes in this buffer
*/
void onAvailable(Runnable onAvailable);

/**
* Indicates whether the buffer is closed by the producing side.
*
* @return {@code true} if closed, {@code false} otherwise
*/
boolean closed();

/**
* Returns an error from this buffer, if any.
*
* @return an {@link Optional} of an error, if any
*/
Optional<ErrorMessage> error();
}

0 comments on commit 731f853

Please sign in to comment.