Skip to content

Commit

Permalink
merge: #10163
Browse files Browse the repository at this point in the history
10163: Use RecordBatch to write follow up records r=Zelldon a=Zelldon

## Description

In order to determine the maximum record batch size, we use a new method on the batch writer. This method allows us to check whether we would be able to write a specific event count and batch size to the writer. 

During processing a RecordBatch is built up in the ProcessingResultBuilder and given as ImmutableRecordBatch inside the ProcessingResult to the PrcoessingStateMachine. Here we tried to not change the interfaces, which causes us to cast on certain places, this is similar to how it currently is done in the LegacyWriter.

The RecordBatch is consumed by the ProcessingStateMachine in order to write the records. A follow-up PR will clean up other parts, like the unused writer in the ProcessingResult, etc.

Some StreamProcessor tests relied on the writer usage, which has been disabled/removed for now. We will soon rewrite them.


<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related #9724
related #10001



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 25, 2022
2 parents ce79272 + 503bdbc commit 82f7212
Show file tree
Hide file tree
Showing 17 changed files with 130 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -28,6 +30,11 @@ public long writeRecordsToStream(final LogStreamBatchWriter logStreamBatchWriter
return 0;
}

@Override
public ImmutableRecordBatch getRecordBatch() {
return RecordBatch.empty();
}

@Override
public boolean writeResponse(final CommandResponseWriter commandResponseWriter) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,29 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;

public final class EmptyProcessingResult implements ProcessingResult {

public static final ProcessingResult INSTANCE = new EmptyProcessingResult();
private final ImmutableRecordBatch emptyRecordBatch;

private EmptyProcessingResult() {}
private EmptyProcessingResult() {
emptyRecordBatch = RecordBatch.empty();
}

@Override
public long writeRecordsToStream(final LogStreamBatchWriter logStreamBatchWriter) {
return 0;
}

@Override
public ImmutableRecordBatch getRecordBatch() {
return emptyRecordBatch;
}

@Override
public boolean writeResponse(final CommandResponseWriter commandResponseWriter) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,28 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatchEntry;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;

/**
* Here the interface is just a suggestion. Can be whatever PDT teams thinks is best to work with
*/
public interface ProcessingResult {

@Deprecated
long writeRecordsToStream(LogStreamBatchWriter logStreamBatchWriter);

/**
* Returns the resulting record batch, which can be empty or consist of multiple {@link
* RecordBatchEntry}s. These entries are the result of the current processing. If an entry is of
* type {@link io.camunda.zeebe.protocol.record.RecordType#COMMAND} it will be later processed as
* follow-up command.
*
* @return returns the resulting immutable record batch
*/
ImmutableRecordBatch getRecordBatch();

boolean writeResponse(CommandResponseWriter commandResponseWriter);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;
package io.camunda.zeebe.engine.api.records;

/**
* Represents an unmodifiable batch of records, which extends the {@link Iterable<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;
package io.camunda.zeebe.engine.api.records;

import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;
package io.camunda.zeebe.engine.api.records;

import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;
package io.camunda.zeebe.engine.api.records;

import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand All @@ -28,6 +28,10 @@ public RecordBatch(final RecordBatchSizePredicate recordBatchSizePredicate) {
this.recordBatchSizePredicate = recordBatchSizePredicate;
}

public static ImmutableRecordBatch empty() {
return new RecordBatch((c, s) -> false);
}

@Override
public void appendRecord(
final long key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;
package io.camunda.zeebe.engine.api.records;

import static io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry.EVENT_REGISTRY;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;
package io.camunda.zeebe.engine.api.records;

import java.util.function.BiPredicate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -28,15 +29,17 @@ final class DirectProcessingResult implements ProcessingResult, TaskResult {
private final LegacyTypedStreamWriter streamWriter;
private final DirectTypedResponseWriter responseWriter;
private boolean hasResponse;
private final ImmutableRecordBatch immutableRecordBatch;

DirectProcessingResult(
final StreamProcessorContext context,
final ImmutableRecordBatch immutableRecordBatch,
final List<PostCommitTask> postCommitTasks,
final boolean hasResponse) {
this.postCommitTasks = new ArrayList<>(postCommitTasks);
streamWriter = context.getLogStreamWriter();
responseWriter = context.getTypedResponseWriter();

this.immutableRecordBatch = immutableRecordBatch;
this.hasResponse = hasResponse;
}

Expand All @@ -45,6 +48,11 @@ public long writeRecordsToStream(final LogStreamBatchWriter logStreamBatchWriter
return streamWriter.flush();
}

@Override
public ImmutableRecordBatch getRecordBatch() {
return immutableRecordBatch;
}

@Override
public boolean writeResponse(final CommandResponseWriter commandResponseWriter) {
// here we must assume that response writer is backed up by command response writer internally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,24 @@
*/
package io.camunda.zeebe.streamprocessor;

import static io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry.EVENT_REGISTRY;

import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatchSizePredicate;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Implementation of {@code ProcessingResultBuilder} that uses direct access to the stream and to
Expand All @@ -36,14 +43,21 @@ final class DirectProcessingResultBuilder implements ProcessingResultBuilder {
private boolean hasResponse =
true; // TODO figure out why this still needs to be true for tests to pass
private final long sourceRecordPosition;
private final RecordBatch mutableRecordBatch;
private final Map<Class<? extends UnpackedObject>, ValueType> typeRegistry;

DirectProcessingResultBuilder(
final StreamProcessorContext context, final long sourceRecordPosition) {
final StreamProcessorContext context,
final long sourceRecordPosition,
final RecordBatchSizePredicate predicate) {
this.context = context;
this.sourceRecordPosition = sourceRecordPosition;
streamWriter = context.getLogStreamWriter();
streamWriter.configureSourceContext(sourceRecordPosition);
responseWriter = context.getTypedResponseWriter();
mutableRecordBatch = new RecordBatch(predicate);
typeRegistry = new HashMap<>();
EVENT_REGISTRY.forEach((e, c) -> typeRegistry.put(c, e));
}

@Override
Expand All @@ -54,6 +68,21 @@ public ProcessingResultBuilder appendRecord(
final RejectionType rejectionType,
final String rejectionReason,
final RecordValue value) {

final ValueType valueType = typeRegistry.get(value.getClass());
if (valueType == null) {
// usually happens when the record is not registered at the TypedStreamEnvironment
throw new IllegalStateException("Missing value type mapping for record: " + value.getClass());
}

if (value instanceof UnifiedRecordValue unifiedRecordValue) {
mutableRecordBatch.appendRecord(
key, -1, type, intent, rejectionType, rejectionReason, valueType, unifiedRecordValue);
} else {
throw new IllegalStateException(
String.format("The record value %s is not a UnifiedRecordValue", value));
}

streamWriter.appendRecord(key, type, intent, rejectionType, rejectionReason, value);
return this;
}
Expand Down Expand Up @@ -106,7 +135,7 @@ public ProcessingResultBuilder resetPostCommitTasks() {

@Override
public ProcessingResult build() {
return new DirectProcessingResult(context, postCommitTasks, hasResponse);
return new DirectProcessingResult(context, mutableRecordBatch, postCommitTasks, hasResponse);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.TaskResultBuilder;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
Expand Down Expand Up @@ -41,6 +42,6 @@ public DirectTaskResultBuilder appendCommandRecord(

@Override
public TaskResult build() {
return new DirectProcessingResult(context, Collections.emptyList(), false);
return new DirectProcessingResult(context, RecordBatch.empty(), Collections.emptyList(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.RecordValues;
import io.camunda.zeebe.logstreams.impl.Loggers;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
Expand Down Expand Up @@ -136,6 +137,7 @@ public final class ProcessingStateMachine {
private final List<RecordProcessor> recordProcessors;
private ProcessingResult currentProcessingResult;
private RecordProcessor currentProcessor;
private final LogStreamBatchWriter logStreamBatchWriter;

public ProcessingStateMachine(
final StreamProcessorContext context,
Expand All @@ -147,6 +149,7 @@ public ProcessingStateMachine(
recordValues = context.getRecordValues();
logStreamReader = context.getLogStreamReader();
logStreamWriter = context.getLogStreamWriter();
logStreamBatchWriter = context.getLogStreamBatchWriter();
transactionContext = context.getTransactionContext();
abortCondition = context.getAbortCondition();
lastProcessedPositionState = context.getLastProcessedPositionState();
Expand Down Expand Up @@ -240,7 +243,8 @@ private void processCommand(final LoggedEvent command) {

final long position = typedCommand.getPosition();
final ProcessingResultBuilder processingResultBuilder =
new DirectProcessingResultBuilder(context, position);
new DirectProcessingResultBuilder(
context, position, logStreamBatchWriter::canWriteAdditionalEvent);

metrics.processingLatency(command.getTimestamp(), processingStartTime);

Expand Down Expand Up @@ -321,7 +325,8 @@ private void errorHandlingInTransaction(final Throwable processingException) thr
() -> {
final long position = typedCommand.getPosition();
final ProcessingResultBuilder processingResultBuilder =
new DirectProcessingResultBuilder(context, position);
new DirectProcessingResultBuilder(
context, position, logStreamBatchWriter::canWriteAdditionalEvent);
// todo(#10047): replace this reset method by using Buffered Writers
processingResultBuilder.reset();

Expand All @@ -337,18 +342,26 @@ private void writeRecords() {
final ActorFuture<Boolean> retryFuture =
writeRetryStrategy.runWithRetry(
() -> {
final var batchWriter = context.getLogStreamBatchWriter();
final long position1 = currentProcessingResult.writeRecordsToStream(batchWriter);
final long position2 = batchWriter.tryWrite();

final var maxPosition = Math.max(position1, position2);

// only overwrite position if records were flushed
if (maxPosition > 0) {
writtenPosition = maxPosition;
logStreamBatchWriter.reset();
logStreamBatchWriter.sourceRecordPosition(typedCommand.getPosition());

currentProcessingResult
.getRecordBatch()
.forEach(
entry ->
logStreamBatchWriter
.event()
.key(entry.key())
.metadataWriter(entry.recordMetadata())
.sourceIndex(entry.sourceIndex())
.valueWriter(entry.recordValue())
.done());

final long position = logStreamBatchWriter.tryWrite();
if (position > 0) {
writtenPosition = position;
}

return maxPosition >= 0;
return position >= 0;
},
abortCondition);

Expand Down

0 comments on commit 82f7212

Please sign in to comment.