Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming query support #112

Merged
merged 21 commits into from
Mar 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0b6c917
Added a way to register a listener when query completes.
m1l4n54v1c Oct 28, 2021
0372f2c
Add flow control messages and listeners for query channel
schananas Nov 2, 2021
14d1c8e
Fix peek() in FlowControlledBuffer to work as described in doc
schananas Nov 4, 2021
1fbe25d
add logger for unknown requests
schananas Nov 9, 2021
a1a6049
fix javadoc and todo remove
schananas Nov 9, 2021
2231e19
fix javadoc and todo remove
schananas Nov 9, 2021
9e3cacd
add registration for query complete and query flow control
schananas Nov 10, 2021
1c082cd
Fixed Query Flow Control typo.
m1l4n54v1c Nov 24, 2021
4509600
QueryHandler now returns a handle for FlowControl.
m1l4n54v1c Dec 9, 2021
c412f06
Code review remarks.
m1l4n54v1c Dec 13, 2021
21a54a8
Added test for MultiFlowControl with a single handler.
m1l4n54v1c Dec 14, 2021
487c650
Added copyright to new files.
m1l4n54v1c Dec 15, 2021
4151402
Added buffering flow control capabilities to query functionality.
m1l4n54v1c Jan 26, 2022
bbd632b
Merge branch 'feature/adminApi/getEventProcessors' into streaming-query
m1l4n54v1c Feb 2, 2022
31d060e
Merge branch 'feature/adminApi/getEventProcessors' into streaming-query
m1l4n54v1c Feb 2, 2022
55c7f40
Merge branch 'master' into streaming-query
m1l4n54v1c Feb 3, 2022
5ce2db7
code review: API changes
m1l4n54v1c Mar 1, 2022
d412ba5
Added @since.
m1l4n54v1c Mar 4, 2022
2a23d5b
Review comments.
m1l4n54v1c Mar 21, 2022
1f91110
Sonar comments.
m1l4n54v1c Mar 21, 2022
afdfd2e
Adepted to the latest API changes.
m1l4n54v1c Mar 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/main/java/io/axoniq/axonserver/connector/FlowControl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2020-2022. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonserver.connector;

/**
* Controls the flow of the messages via communication channel. Implementations should send messages only when
* requested. Once the cancellation is invoked, implementations should stop sending messages.
*
* @author Milan Savic
* @author Stefan Dragisic
* @author Allard Buijze
* @since 4.6.0
*/
public interface FlowControl {

/**
* Requests the {@code requested} amount of responses to be sent.
*
* @param requested number of responses to be sent
*/
void request(long requested);

/**
* Cancels response sending.
*/
void cancel();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright (c) 2020-2022. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.ErrorMessage;

/**
* A {@link ReplyChannel} implementation that uses a given {@link CloseableBuffer buffer} to buffer send, complete and
* complete-with-error. {@code ACK}s and {@code NACK}s are delegated via {@code delegate ReplyChannel}.
*
* @param <T> the type of messages this {@link ReplyChannel} deals with
* @author Milan Savic
* @author Stefan Dragisic
* @author Allard Buijze
* @since 4.6.0
*/
public class BufferingReplyChannel<T> implements ReplyChannel<T> {

private final ReplyChannel<T> delegate;
private final CloseableBuffer<T> buffer;

/**
* Instantiates this {@link BufferingReplyChannel} with given {@code delegate} and {@code buffer}.
*
* @param delegate used to delegate {@code ack}s and {@code nack}s
* @param buffer used to buffer sends, completes and completes with error
*/
public BufferingReplyChannel(ReplyChannel<T> delegate, CloseableBuffer<T> buffer) {
this.delegate = delegate;
this.buffer = buffer;
}

@Override
public void send(T outboundMessage) {
buffer.put(outboundMessage);
}

@Override
public void sendAck() {
delegate.sendAck();
}

@Override
public void sendNack(ErrorMessage errorMessage) {
delegate.sendNack(errorMessage);
}

@Override
public void complete() {
buffer.close();
}

@Override
public void completeWithError(ErrorMessage errorMessage) {
buffer.closeExceptionally(errorMessage);
}

@Override
public void completeWithError(ErrorCategory errorCategory, String message) {
ErrorMessage error = ErrorMessage.newBuilder()
.setErrorCode(errorCategory.errorCode())
.setMessage(message)
.build();
buffer.closeExceptionally(error);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2020-2022. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ReplyChannel;
import io.axoniq.axonserver.grpc.ErrorMessage;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@link ReplyChannel} implementation that will trigger a given {@link Runnable action} when it is closed.
*
* @param <T> the type of messages flowing through this {@link ReplyChannel}
* @author Milan Savic
* @author Stefan Dragisic
* @author Allard Buijze
* @since 4.6.0
*/
public class CloseAwareReplyChannel<T> implements ReplyChannel<T> {

private final ReplyChannel<T> delegate;
private final Runnable onClose;
private final AtomicBoolean closed = new AtomicBoolean();

/**
* Instantiates this {@link ReplyChannel} with given {@code delegate}.
smcvb marked this conversation as resolved.
Show resolved Hide resolved
*
* @param delegate the delegate
*/
public CloseAwareReplyChannel(ReplyChannel<T> delegate) {
this(delegate, () -> {
});
}

/**
* Instantiates this {@link ReplyChannel} with given {@code delegate} and {@code onClose} {@link Runnable} execution
* to be executed once the channel is closed.
*
* @param delegate the delegate
* @param onClose to be executed when channel is closed
*/
public CloseAwareReplyChannel(ReplyChannel<T> delegate, Runnable onClose) {
this.delegate = delegate;
this.onClose = () -> {
closed.set(true);
onClose.run();
};
}

@Override
public void send(T outboundMessage) {
delegate.send(outboundMessage);
}

@Override
public void sendLast(T outboundMessage) {
delegate.sendLast(outboundMessage);
onClose.run();
}

@Override
public void sendAck() {
delegate.sendAck();
}

@Override
public void sendNack() {
delegate.sendNack();
}

@Override
public void sendNack(ErrorMessage errorMessage) {
delegate.sendNack(errorMessage);
}

@Override
public void complete() {
delegate.complete();
onClose.run();
}

@Override
public void completeWithError(ErrorMessage errorMessage) {
delegate.completeWithError(errorMessage);
onClose.run();
}

@Override
public void completeWithError(ErrorCategory errorCategory, String message) {
delegate.completeWithError(errorCategory, message);
onClose.run();
}

/**
* Indicates whether this {@link ReplyChannel} is closed.
*
* @return {@code true} if closed, {@code false} otherwise
*/
public boolean isClosed() {
return closed.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2020-2022. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.grpc.ErrorMessage;

/**
* A {@link CloseableReadonlyBuffer buffer} that can be closed by the producing side.
*
* @param <T> the type of messages in this buffer
* @author Milan Savic
* @author Stefan Dragisic
* @author Allard Buijze
* @since 4.6.0
*/
public interface CloseableBuffer<T> extends CloseableReadonlyBuffer<T> {

/**
* Puts a message in this buffer.
*
* @param message the message to be put in this buffer
*/
void put(T message);

/**
* Closes this buffer from the producing side.
*/
void close();

/**
* Closes this buffer exceptionally from the producing side.
*
* @param errorMessage an error indicating why this buffer is closed
*/
void closeExceptionally(ErrorMessage errorMessage);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2020-2022. AxonIQ
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.axoniq.axonserver.connector.impl;

import io.axoniq.axonserver.grpc.ErrorMessage;

import java.util.Optional;

/**
* A readonly buffer that can be closed from the producing side.
*
* @param <T> the type of messages in this buffer
* @author Milan Savic
* @author Stefan Dragisic
* @author Allard Buijze
* @since 4.6.0
*/
public interface CloseableReadonlyBuffer<T> {

/**
* Polls the message from this buffer. If the returned {@link Optional} is empty, that doesn't mean that this buffer
* stays empty forever. The {@link #closed()} method should be consolidated to validate this.
*
* @return an {@link Optional} with polled message
*/
Optional<T> poll();

/**
* Indicates whether there are messages in the buffer. If this returns {@code false}, that doesn't mean that this
* buffer stays empty forever. The {@link #closed()} method should be consolidated to validate this.
*
* @return {@code true} if buffer is empty, {@code false} otherwise
*/
boolean isEmpty();

/**
* Returns the overall capacity of this buffer.
*
* @return the overall capacity of this buffer
*/
int capacity();

/**
* Registers an action to be triggered when there is a new message added to the buffer, or the buffer is closed, or
* the buffer is closed with an error. The action can check {@link #poll()}, {@link #closed()}, and {@link #error()}
* to validate this.
*
* @param onAvailable to be invoked when there are changes in this buffer
*/
void onAvailable(Runnable onAvailable);

/**
* Indicates whether the buffer is closed by the producing side.
*
* @return {@code true} if closed, {@code false} otherwise
*/
boolean closed();

/**
* Returns an error from this buffer, if any.
*
* @return an {@link Optional} of an error, if any
*/
Optional<ErrorMessage> error();
}