Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: create new readers and writer for every async request #10026

Merged
merged 1 commit into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,33 @@
import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler.ResponseWriter;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.transport.RequestHandler;
import io.camunda.zeebe.util.Either;
import java.util.function.Supplier;

/**
* @param <R>
* @param <W>
* A {@link RequestHandler} that automatically decodes requests and encodes successful and error
* responses. Handling requests is synchronous, use {@link AsyncApiRequestHandler} if handling
* should be asynchronous.
*
* @param <R> a {@link RequestReader} that reads the request. Reset on every request.
* @param <W> a {@link ResponseWriter} that writes the response. Reset on every request.
*/
public abstract class ApiRequestHandler<R extends RequestReader<?>, W extends ResponseWriter>
extends AsyncApiRequestHandler<R, W> {

protected ApiRequestHandler(final R requestReader, final W responseWriter) {
super(requestReader, responseWriter);
super(
() -> requestReader,
() -> responseWriter,
new Supplier<>() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 Looks weird but I found no other way to define a supplier that always returns the same instance of errorResponseWriter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. Didn't see this comment before my review.

I think it is ok if it creates a new instance everytime.

private final ErrorResponseWriter errorResponseWriter = new ErrorResponseWriter();

@Override
public ErrorResponseWriter get() {
return errorResponseWriter;
}
Comment on lines +33 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
new Supplier<>() {
private final ErrorResponseWriter errorResponseWriter = new ErrorResponseWriter();
@Override
public ErrorResponseWriter get() {
return errorResponseWriter;
}
() -> new ErrorResponseWriter()

});
}

protected abstract Either<ErrorResponseWriter, W> handle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferReader;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.function.Supplier;
import org.agrona.DirectBuffer;
import org.agrona.sbe.MessageDecoderFlyweight;
import org.slf4j.Logger;
Expand All @@ -33,13 +34,23 @@ public abstract class AsyncApiRequestHandler<R extends RequestReader<?>, W exten
extends Actor implements RequestHandler {

public static final Logger LOG = Loggers.TRANSPORT_LOGGER;
private final ErrorResponseWriter errorResponseWriter = new ErrorResponseWriter();
private final R requestReader;
private final W responseWriter;
private final Supplier<R> requestReaderSupplier;
private final Supplier<W> responseWriterSupplier;

protected AsyncApiRequestHandler(final R requestReader, final W responseWriter) {
this.requestReader = requestReader;
this.responseWriter = responseWriter;
private final Supplier<ErrorResponseWriter> errorResponseWriterSupplier;

protected AsyncApiRequestHandler(
final Supplier<R> requestReaderSupplier, final Supplier<W> responseWriterSupplier) {
this(requestReaderSupplier, responseWriterSupplier, ErrorResponseWriter::new);
}

protected AsyncApiRequestHandler(
final Supplier<R> requestReaderSupplier,
final Supplier<W> responseWriterSupplier,
final Supplier<ErrorResponseWriter> errorResponseWriterSupplier) {
this.requestReaderSupplier = requestReaderSupplier;
this.responseWriterSupplier = responseWriterSupplier;
this.errorResponseWriterSupplier = errorResponseWriterSupplier;
}

/**
Expand Down Expand Up @@ -79,6 +90,10 @@ private void handleRequest(
final DirectBuffer buffer,
final int offset,
final int length) {
final var requestReader = requestReaderSupplier.get();
final var responseWriter = responseWriterSupplier.get();
final var errorResponseWriter = errorResponseWriterSupplier.get();
Comment on lines +94 to +95
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 I still think you don't have to create these instances here. Instead it can be created in the implementation of handleAsync. But this is ok for now. We can discuss it when you refactor this class.


requestReader.reset();
responseWriter.reset();
errorResponseWriter.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public class AdminApiRequestHandler

public AdminApiRequestHandler(
final AtomixServerTransport transport, final PartitionManagerImpl partitionManager) {
super(new ApiRequestReader(), new ApiResponseWriter());
super(ApiRequestReader::new, ApiResponseWriter::new);
this.transport = transport;
this.partitionManager = partitionManager;
adminAccess = partitionManager.createAdminAccess(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler.RequestReader;
import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler.ResponseWriter;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.testing.ControlledActorSchedulerRule;
import io.camunda.zeebe.transport.ServerOutput;
import io.camunda.zeebe.util.Either;
Expand Down Expand Up @@ -93,10 +92,6 @@ private static class TestApiRequestHandler
super(requestReader, responseWriter);
}

public ActorControl actor() {
return actor;
}

@Override
protected Either<ErrorResponseWriter, ResponseWriter> handle(
final int partitionId,
Expand Down