-
Notifications
You must be signed in to change notification settings - Fork 556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: create new readers and writer for every async request #10026
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -11,17 +11,33 @@ | |||||||||||||||||
import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler.ResponseWriter; | ||||||||||||||||||
import io.camunda.zeebe.scheduler.future.ActorFuture; | ||||||||||||||||||
import io.camunda.zeebe.scheduler.future.CompletableActorFuture; | ||||||||||||||||||
import io.camunda.zeebe.transport.RequestHandler; | ||||||||||||||||||
import io.camunda.zeebe.util.Either; | ||||||||||||||||||
import java.util.function.Supplier; | ||||||||||||||||||
|
||||||||||||||||||
/** | ||||||||||||||||||
* @param <R> | ||||||||||||||||||
* @param <W> | ||||||||||||||||||
* A {@link RequestHandler} that automatically decodes requests and encodes successful and error | ||||||||||||||||||
* responses. Handling requests is synchronous, use {@link AsyncApiRequestHandler} if handling | ||||||||||||||||||
* should be asynchronous. | ||||||||||||||||||
* | ||||||||||||||||||
* @param <R> a {@link RequestReader} that reads the request. Reset on every request. | ||||||||||||||||||
* @param <W> a {@link ResponseWriter} that writes the response. Reset on every request. | ||||||||||||||||||
*/ | ||||||||||||||||||
public abstract class ApiRequestHandler<R extends RequestReader<?>, W extends ResponseWriter> | ||||||||||||||||||
extends AsyncApiRequestHandler<R, W> { | ||||||||||||||||||
|
||||||||||||||||||
protected ApiRequestHandler(final R requestReader, final W responseWriter) { | ||||||||||||||||||
super(requestReader, responseWriter); | ||||||||||||||||||
super( | ||||||||||||||||||
() -> requestReader, | ||||||||||||||||||
() -> responseWriter, | ||||||||||||||||||
new Supplier<>() { | ||||||||||||||||||
private final ErrorResponseWriter errorResponseWriter = new ErrorResponseWriter(); | ||||||||||||||||||
|
||||||||||||||||||
@Override | ||||||||||||||||||
public ErrorResponseWriter get() { | ||||||||||||||||||
return errorResponseWriter; | ||||||||||||||||||
} | ||||||||||||||||||
Comment on lines
+33
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
}); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
protected abstract Either<ErrorResponseWriter, W> handle( | ||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import io.camunda.zeebe.util.Either; | ||
import io.camunda.zeebe.util.buffer.BufferReader; | ||
import io.camunda.zeebe.util.buffer.BufferWriter; | ||
import java.util.function.Supplier; | ||
import org.agrona.DirectBuffer; | ||
import org.agrona.sbe.MessageDecoderFlyweight; | ||
import org.slf4j.Logger; | ||
|
@@ -33,13 +34,23 @@ public abstract class AsyncApiRequestHandler<R extends RequestReader<?>, W exten | |
extends Actor implements RequestHandler { | ||
|
||
public static final Logger LOG = Loggers.TRANSPORT_LOGGER; | ||
private final ErrorResponseWriter errorResponseWriter = new ErrorResponseWriter(); | ||
private final R requestReader; | ||
private final W responseWriter; | ||
private final Supplier<R> requestReaderSupplier; | ||
private final Supplier<W> responseWriterSupplier; | ||
|
||
protected AsyncApiRequestHandler(final R requestReader, final W responseWriter) { | ||
this.requestReader = requestReader; | ||
this.responseWriter = responseWriter; | ||
private final Supplier<ErrorResponseWriter> errorResponseWriterSupplier; | ||
|
||
protected AsyncApiRequestHandler( | ||
final Supplier<R> requestReaderSupplier, final Supplier<W> responseWriterSupplier) { | ||
this(requestReaderSupplier, responseWriterSupplier, ErrorResponseWriter::new); | ||
} | ||
|
||
protected AsyncApiRequestHandler( | ||
final Supplier<R> requestReaderSupplier, | ||
final Supplier<W> responseWriterSupplier, | ||
final Supplier<ErrorResponseWriter> errorResponseWriterSupplier) { | ||
this.requestReaderSupplier = requestReaderSupplier; | ||
this.responseWriterSupplier = responseWriterSupplier; | ||
this.errorResponseWriterSupplier = errorResponseWriterSupplier; | ||
} | ||
|
||
/** | ||
|
@@ -79,6 +90,10 @@ private void handleRequest( | |
final DirectBuffer buffer, | ||
final int offset, | ||
final int length) { | ||
final var requestReader = requestReaderSupplier.get(); | ||
final var responseWriter = responseWriterSupplier.get(); | ||
final var errorResponseWriter = errorResponseWriterSupplier.get(); | ||
Comment on lines
+94
to
+95
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💭 I still think you don't have to create these instances here. Instead it can be created in the implementation of |
||
|
||
requestReader.reset(); | ||
responseWriter.reset(); | ||
errorResponseWriter.reset(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 Looks weird but I found no other way to define a supplier that always returns the same instance of
errorResponseWriter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. Didn't see this comment before my review.
I think it is ok if it creates a new instance everytime.