Skip to content

Commit

Permalink
merge: #10037
Browse files Browse the repository at this point in the history
10037: fix: create new readers and writer for every sync request r=oleschoenburg a=oleschoenburg

## Description

Applies the fix in #10026 for sync api handlers too.

Due to our actor scheduler, even sync handlers can't reuse readers andwriters because `onComplete` on the already completed future doesn't run immediately, causing a data race on the readers and writers when handling concurrent requests.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10014 



Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and lenaschoenburg committed Aug 9, 2022
2 parents 48e8083 + caefea7 commit de48dc0
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 114 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@

/**
* A {@link RequestHandler} that automatically decodes requests and encodes successful and error
* responses. Handling requests is asynchronous, use {@link ApiRequestHandler} if handling can be
* synchronous.
* responses. Handling requests is asynchronous.
*
* @param <R> a {@link RequestReader} that reads the request
* @param <W> a {@link ResponseWriter} that writes the response
Expand All @@ -37,20 +36,13 @@ public abstract class AsyncApiRequestHandler<R extends RequestReader<?>, W exten
private final Supplier<R> requestReaderSupplier;
private final Supplier<W> responseWriterSupplier;

private final Supplier<ErrorResponseWriter> errorResponseWriterSupplier;
private final Supplier<ErrorResponseWriter> errorResponseWriterSupplier =
ErrorResponseWriter::new;

protected AsyncApiRequestHandler(
final Supplier<R> requestReaderSupplier, final Supplier<W> responseWriterSupplier) {
this(requestReaderSupplier, responseWriterSupplier, ErrorResponseWriter::new);
}

protected AsyncApiRequestHandler(
final Supplier<R> requestReaderSupplier,
final Supplier<W> responseWriterSupplier,
final Supplier<ErrorResponseWriter> errorResponseWriterSupplier) {
this.requestReaderSupplier = requestReaderSupplier;
this.responseWriterSupplier = responseWriterSupplier;
this.errorResponseWriterSupplier = errorResponseWriterSupplier;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package io.camunda.zeebe.broker.transport.backupapi;

import io.camunda.zeebe.broker.system.monitoring.DiskSpaceUsageListener;
import io.camunda.zeebe.broker.transport.ApiRequestHandler;
import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler;
import io.camunda.zeebe.broker.transport.ErrorResponseWriter;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
Expand All @@ -18,6 +18,8 @@
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.management.CheckpointIntent;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.transport.RequestType;
import io.camunda.zeebe.transport.impl.AtomixServerTransport;
import io.camunda.zeebe.util.Either;
Expand All @@ -26,7 +28,7 @@
* Request handler to handle commands and queries related to the backup ({@link RequestType#BACKUP})
*/
public final class BackupApiRequestHandler
extends ApiRequestHandler<BackupApiRequestReader, BackupApiResponseWriter>
extends AsyncApiRequestHandler<BackupApiRequestReader, BackupApiResponseWriter>
implements DiskSpaceUsageListener {
private boolean isDiskSpaceAvailable = true;
private final LogStreamRecordWriter logStreamRecordWriter;
Expand All @@ -37,7 +39,7 @@ public BackupApiRequestHandler(
final AtomixServerTransport transport,
final LogStreamRecordWriter logStreamRecordWriter,
final int partitionId) {
super(new BackupApiRequestReader(), new BackupApiResponseWriter());
super(BackupApiRequestReader::new, BackupApiResponseWriter::new);
this.logStreamRecordWriter = logStreamRecordWriter;
this.transport = transport;
this.partitionId = partitionId;
Expand All @@ -54,12 +56,21 @@ public void close() {
}

@Override
protected Either<ErrorResponseWriter, BackupApiResponseWriter> handle(
protected ActorFuture<Either<ErrorResponseWriter, BackupApiResponseWriter>> handleAsync(
final int partitionId,
final long requestId,
final BackupApiRequestReader requestReader,
final BackupApiResponseWriter responseWriter,
final ErrorResponseWriter errorWriter) {
return CompletableActorFuture.completed(
handle(partitionId, requestReader, responseWriter, errorWriter));
}

private Either<ErrorResponseWriter, BackupApiResponseWriter> handle(
final int partitionId,
final BackupApiRequestReader requestReader,
final BackupApiResponseWriter responseWriter,
final ErrorResponseWriter errorWriter) {

if (requestReader.type() == BackupRequestType.TAKE_BACKUP) {
if (!isDiskSpaceAvailable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
package io.camunda.zeebe.broker.transport.commandapi;

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.transport.ApiRequestHandler;
import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler;
import io.camunda.zeebe.broker.transport.ErrorResponseWriter;
import io.camunda.zeebe.broker.transport.backpressure.BackpressureMetrics;
import io.camunda.zeebe.broker.transport.backpressure.RequestLimiter;
Expand All @@ -18,12 +18,14 @@
import io.camunda.zeebe.protocol.record.ExecuteCommandRequestDecoder;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.Either;
import org.agrona.collections.Int2ObjectHashMap;
import org.slf4j.Logger;

final class CommandApiRequestHandler
extends ApiRequestHandler<CommandApiRequestReader, CommandApiResponseWriter> {
extends AsyncApiRequestHandler<CommandApiRequestReader, CommandApiResponseWriter> {
private static final Logger LOG = Loggers.TRANSPORT_LOGGER;

private final Int2ObjectHashMap<LogStreamRecordWriter> leadingStreams = new Int2ObjectHashMap<>();
Expand All @@ -33,11 +35,21 @@ final class CommandApiRequestHandler
private boolean isDiskSpaceAvailable = true;

CommandApiRequestHandler() {
super(new CommandApiRequestReader(), new CommandApiResponseWriter());
super(CommandApiRequestReader::new, CommandApiResponseWriter::new);
}

@Override
protected Either<ErrorResponseWriter, CommandApiResponseWriter> handle(
protected ActorFuture<Either<ErrorResponseWriter, CommandApiResponseWriter>> handleAsync(
final int partitionId,
final long requestId,
final CommandApiRequestReader requestReader,
final CommandApiResponseWriter responseWriter,
final ErrorResponseWriter errorWriter) {
return CompletableActorFuture.completed(
handle(partitionId, requestId, requestReader, responseWriter, errorWriter));
}

private Either<ErrorResponseWriter, CommandApiResponseWriter> handle(
final int partitionId,
final long requestId,
final CommandApiRequestReader requestReader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@

import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.broker.system.configuration.QueryApiCfg;
import io.camunda.zeebe.broker.transport.ApiRequestHandler;
import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler;
import io.camunda.zeebe.broker.transport.ErrorResponseWriter;
import io.camunda.zeebe.engine.state.QueryService;
import io.camunda.zeebe.engine.state.QueryService.ClosedServiceException;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.protocol.record.ExecuteQueryRequestDecoder;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.Either;
import java.util.EnumSet;
import java.util.Map;
Expand All @@ -32,7 +34,7 @@
@SuppressWarnings("removal")
@Deprecated(forRemoval = true, since = "1.2.0")
public final class QueryApiRequestHandler
extends ApiRequestHandler<QueryRequestReader, QueryResponseWriter> {
extends AsyncApiRequestHandler<QueryRequestReader, QueryResponseWriter> {
private static final Set<ValueType> ACCEPTED_VALUE_TYPES =
EnumSet.of(ValueType.PROCESS, ValueType.PROCESS_INSTANCE, ValueType.JOB);

Expand All @@ -41,7 +43,7 @@ public final class QueryApiRequestHandler
private final String actorName;

public QueryApiRequestHandler(final QueryApiCfg config, final int nodeId) {
super(new QueryRequestReader(), new QueryResponseWriter());
super(QueryRequestReader::new, QueryResponseWriter::new);
this.config = config;
actorName = buildActorName(nodeId, "QueryApi");
}
Expand All @@ -65,12 +67,21 @@ public void removePartition(final int partitionId) {
}

@Override
protected Either<ErrorResponseWriter, QueryResponseWriter> handle(
protected ActorFuture<Either<ErrorResponseWriter, QueryResponseWriter>> handleAsync(
final int partitionId,
final long requestId,
final QueryRequestReader requestReader,
final QueryResponseWriter responseWriter,
final ErrorResponseWriter errorWriter) {
return CompletableActorFuture.completed(
handle(partitionId, requestReader, responseWriter, errorWriter));
}

private Either<ErrorResponseWriter, QueryResponseWriter> handle(
final int partitionId,
final QueryRequestReader requestReader,
final QueryResponseWriter responseWriter,
final ErrorResponseWriter errorWriter) {
if (!config.isEnabled()) {
errorWriter
.errorCode(ErrorCode.UNSUPPORTED_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,54 +13,54 @@

import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler.RequestReader;
import io.camunda.zeebe.broker.transport.AsyncApiRequestHandler.ResponseWriter;
import io.camunda.zeebe.scheduler.testing.ControlledActorSchedulerRule;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.scheduler.testing.ControlledActorSchedulerExtension;
import io.camunda.zeebe.transport.ServerOutput;
import io.camunda.zeebe.util.Either;
import java.util.function.Supplier;
import org.agrona.DirectBuffer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

public class ApiRequestHandlerTest {

@Rule
public final ControlledActorSchedulerRule schedulerRule = new ControlledActorSchedulerRule();

private RequestReader<?> reader;
private ResponseWriter writer;
private TestApiRequestHandler handler;

@Before
public void setUp() {
reader = mock(RequestReader.class);
writer = mock(ResponseWriter.class);
handler = new TestApiRequestHandler(reader, writer);
schedulerRule.submitActor(handler);
}
final class ApiRequestHandlerTest {
@RegisterExtension
public final ControlledActorSchedulerExtension actorScheduler =
new ControlledActorSchedulerExtension();

@Test
public void shouldReadRequestBuffer() {
void shouldReadRequestBuffer() {
// given
final var reader = mock(RequestReader.class);
final var writer = mock(ResponseWriter.class);
final var handler = new TestApiRequestHandler(() -> reader, () -> writer);
actorScheduler.submitActor(handler);

final var buffer = mock(DirectBuffer.class);
final var output = mock(ServerOutput.class);

// when
handler.onRequest(output, 0, 0, buffer, 0, 1);
schedulerRule.workUntilDone();
actorScheduler.workUntilDone();

// then
verify(reader).wrap(buffer, 0, 1);
}

@Test
public void shouldResetReaderAndWriter() {
void shouldResetReaderAndWriter() {
// given
final var reader = mock(RequestReader.class);
final var writer = mock(ResponseWriter.class);
final var handler = new TestApiRequestHandler(() -> reader, () -> writer);
actorScheduler.submitActor(handler);

final var buffer = mock(DirectBuffer.class);
final var output = mock(ServerOutput.class);

// when
handler.onRequest(output, 0, 0, buffer, 0, 1);
schedulerRule.workUntilDone();
actorScheduler.workUntilDone();

// then

Expand All @@ -69,8 +69,13 @@ public void shouldResetReaderAndWriter() {
}

@Test
public void shouldWriteResponse() {
void shouldWriteResponse() {
// given
final var reader = mock(RequestReader.class);
final var writer = mock(ResponseWriter.class);
final var handler = new TestApiRequestHandler(() -> reader, () -> writer);
actorScheduler.submitActor(handler);

final var buffer = mock(DirectBuffer.class);
final var output = mock(ServerOutput.class);
final var partitionId = 12;
Expand All @@ -79,27 +84,28 @@ public void shouldWriteResponse() {

// when
handler.onRequest(output, partitionId, requestId, buffer, 0, 1);
schedulerRule.workUntilDone();
actorScheduler.workUntilDone();

// then
verify(writer).tryWriteResponse(output, partitionId, requestId);
}

private static class TestApiRequestHandler
extends ApiRequestHandler<RequestReader<?>, ResponseWriter> {
extends AsyncApiRequestHandler<RequestReader<?>, ResponseWriter> {
TestApiRequestHandler(
final RequestReader<?> requestReader, final ResponseWriter responseWriter) {
super(requestReader, responseWriter);
final Supplier<RequestReader<?>> requestReaderSupplier,
final Supplier<ResponseWriter> responseWriterSupplier) {
super(requestReaderSupplier, responseWriterSupplier);
}

@Override
protected Either<ErrorResponseWriter, ResponseWriter> handle(
protected ActorFuture<Either<ErrorResponseWriter, ResponseWriter>> handleAsync(
final int partitionId,
final long requestId,
final RequestReader<?> requestReader,
final ResponseWriter responseWriter,
final ErrorResponseWriter errorWriter) {
return Either.right(responseWriter);
return CompletableActorFuture.completed(Either.right(responseWriter));
}
}
}

0 comments on commit de48dc0

Please sign in to comment.