Skip to content

Commit

Permalink
merge: #10188
Browse files Browse the repository at this point in the history
10188: Use Recordbatch with TaskResult r=Zelldon a=Zelldon

## Description


Prework was to create an inverse static EventRegistry -> TypeRegistry to reuse this in the different result builders. This allows getting the value type for a certain record class.

The RecordBatch is now part of the TaskResult. The RecordBatch is filled with the TaskResultBuilder and returned with
the TaskResult. The RecordBatch is written to the Dispatcher in the ProcessingScheduleService.

After migrating to the RecordBatch we were able to delete `writeRecordsToStream`, which further reduce the dependencies. For example Backup module no longer depends on logstream.

Next step: Migrate the response writing to record batch, this allows to remove further dependencies.

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related #10001 



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 26, 2022
2 parents 66f684c + 37bf2f5 commit 95d5a16
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 79 deletions.
7 changes: 0 additions & 7 deletions backup/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,6 @@
<scope>test</scope>
</dependency>

<!-- Should not depend on logstreams. But we have some dependencies in the test -->
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-logstreams</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-scheduler</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
Expand All @@ -25,11 +24,6 @@

record MockProcessingResult(List<Event> records) implements ProcessingResult {

@Override
public long writeRecordsToStream(final LogStreamBatchWriter logStreamBatchWriter) {
return 0;
}

@Override
public ImmutableRecordBatch getRecordBatch() {
return RecordBatch.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;

public final class EmptyProcessingResult implements ProcessingResult {

Expand All @@ -20,11 +19,6 @@ private EmptyProcessingResult() {
emptyRecordBatch = RecordBatch.empty();
}

@Override
public long writeRecordsToStream(final LogStreamBatchWriter logStreamBatchWriter) {
return 0;
}

@Override
public ImmutableRecordBatch getRecordBatch() {
return emptyRecordBatch;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,12 @@

import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatchEntry;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;

/**
* Here the interface is just a suggestion. Can be whatever PDT teams thinks is best to work with
*/
public interface ProcessingResult {

@Deprecated
long writeRecordsToStream(LogStreamBatchWriter logStreamBatchWriter);

/**
* Returns the resulting record batch, which can be empty or consist of multiple {@link
* RecordBatchEntry}s. These entries are the result of the current processing. If an entry is of
Expand Down
13 changes: 11 additions & 2 deletions engine/src/main/java/io/camunda/zeebe/engine/api/TaskResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,19 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatchEntry;

/** Here the interface is just a suggestion. Can be whatever PDT team thinks is best to work with */
public interface TaskResult {

long writeRecordsToStream(LogStreamBatchWriter logStreamBatchWriter);
/**
* Returns the resulting record batch, which can be empty or consist of multiple {@link
* RecordBatchEntry}s. These entries are the result of the current task execution. If an entry is
* of type {@link io.camunda.zeebe.protocol.record.RecordType#COMMAND} it will be later processed
* as follow-up command by the {@link RecordProcessor}
*
* @return returns the resulting immutable record batch
*/
ImmutableRecordBatch getRecordBatch();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.intent.Intent;

/** Here the interface is just a suggestion. Can be whatever PDT team thinks is best to work with */
Expand All @@ -19,7 +19,7 @@ public interface TaskResultBuilder {
* @return returns itself for method chaining
*/
TaskResultBuilder appendCommandRecord(
final long key, final Intent intent, final RecordValue value);
final long key, final Intent intent, final UnifiedRecordValue value);

TaskResult build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import io.camunda.zeebe.protocol.record.ValueType;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;

public final class TypedEventRegistry {

public static final Map<ValueType, Class<? extends UnifiedRecordValue>> EVENT_REGISTRY;
public static final Map<Class<? extends UnifiedRecordValue>, ValueType> TYPE_REGISTRY;

static {
final EnumMap<ValueType, Class<? extends UnifiedRecordValue>> registry =
Expand Down Expand Up @@ -67,5 +69,11 @@ public final class TypedEventRegistry {
registry.put(ValueType.DECISION_EVALUATION, DecisionEvaluationRecord.class);

EVENT_REGISTRY = Collections.unmodifiableMap(registry);

final Map<Class<? extends UnifiedRecordValue>, ValueType> typeRegistry = new HashMap<>();
EVENT_REGISTRY.forEach((e, c) -> typeRegistry.put(c, e));
TYPE_REGISTRY = Collections.unmodifiableMap(typeRegistry);
}

private TypedEventRegistry() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.records.ImmutableRecordBatch;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -25,8 +24,6 @@
final class DirectProcessingResult implements ProcessingResult, TaskResult {

private final List<PostCommitTask> postCommitTasks;

private final LegacyTypedStreamWriter streamWriter;
private final DirectTypedResponseWriter responseWriter;
private boolean hasResponse;
private final ImmutableRecordBatch immutableRecordBatch;
Expand All @@ -37,17 +34,11 @@ final class DirectProcessingResult implements ProcessingResult, TaskResult {
final List<PostCommitTask> postCommitTasks,
final boolean hasResponse) {
this.postCommitTasks = new ArrayList<>(postCommitTasks);
streamWriter = context.getLogStreamWriter();
responseWriter = context.getTypedResponseWriter();
this.immutableRecordBatch = immutableRecordBatch;
this.hasResponse = hasResponse;
}

@Override
public long writeRecordsToStream(final LogStreamBatchWriter logStreamBatchWriter) {
return streamWriter.flush();
}

@Override
public ImmutableRecordBatch getRecordBatch() {
return immutableRecordBatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
*/
package io.camunda.zeebe.streamprocessor;

import static io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry.EVENT_REGISTRY;
import static io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry.TYPE_REGISTRY;

import io.camunda.zeebe.engine.api.PostCommitTask;
import io.camunda.zeebe.engine.api.ProcessingResult;
Expand All @@ -22,9 +22,7 @@
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Implementation of {@code ProcessingResultBuilder} that uses direct access to the stream and to
Expand All @@ -44,7 +42,6 @@ final class DirectProcessingResultBuilder implements ProcessingResultBuilder {
true; // TODO figure out why this still needs to be true for tests to pass
private final long sourceRecordPosition;
private final RecordBatch mutableRecordBatch;
private final Map<Class<? extends UnpackedObject>, ValueType> typeRegistry;

DirectProcessingResultBuilder(
final StreamProcessorContext context,
Expand All @@ -56,8 +53,6 @@ final class DirectProcessingResultBuilder implements ProcessingResultBuilder {
streamWriter.configureSourceContext(sourceRecordPosition);
responseWriter = context.getTypedResponseWriter();
mutableRecordBatch = new RecordBatch(predicate);
typeRegistry = new HashMap<>();
EVENT_REGISTRY.forEach((e, c) -> typeRegistry.put(c, e));
}

@Override
Expand All @@ -69,7 +64,7 @@ public ProcessingResultBuilder appendRecord(
final String rejectionReason,
final RecordValue value) {

final ValueType valueType = typeRegistry.get(value.getClass());
final ValueType valueType = TYPE_REGISTRY.get(value.getClass());
if (valueType == null) {
// usually happens when the record is not registered at the TypedStreamEnvironment
throw new IllegalStateException("Missing value type mapping for record: " + value.getClass());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,17 @@
*/
package io.camunda.zeebe.streamprocessor;

import static io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry.TYPE_REGISTRY;

import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.TaskResultBuilder;
import io.camunda.zeebe.engine.api.records.MutableRecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.engine.api.records.RecordBatchSizePredicate;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RecordValue;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import java.util.Collections;

Expand All @@ -25,23 +30,31 @@
final class DirectTaskResultBuilder implements TaskResultBuilder {

private final StreamProcessorContext context;
private final LegacyTypedStreamWriter streamWriter;
private final MutableRecordBatch mutableRecordBatch;

DirectTaskResultBuilder(final StreamProcessorContext context) {
DirectTaskResultBuilder(
final StreamProcessorContext context, final RecordBatchSizePredicate predicate) {
this.context = context;
streamWriter = context.getLogStreamWriter();
streamWriter.configureSourceContext(-1);
mutableRecordBatch = new RecordBatch(predicate);
}

@Override
public DirectTaskResultBuilder appendCommandRecord(
final long key, final Intent intent, final RecordValue value) {
streamWriter.appendRecord(key, RecordType.COMMAND, intent, RejectionType.NULL_VAL, "", value);
final long key, final Intent intent, final UnifiedRecordValue value) {

final ValueType valueType = TYPE_REGISTRY.get(value.getClass());
if (valueType == null) {
// usually happens when the record is not registered at the TypedStreamEnvironment
throw new IllegalStateException("Missing value type mapping for record: " + value.getClass());
}

mutableRecordBatch.appendRecord(
key, -1, RecordType.COMMAND, intent, RejectionType.NULL_VAL, "", valueType, value);
return this;
}

@Override
public TaskResult build() {
return new DirectProcessingResult(context, RecordBatch.empty(), Collections.emptyList(), false);
return new DirectProcessingResult(context, mutableRecordBatch, Collections.emptyList(), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ Runnable toRunnable(final Task task) {
actorControl.submit(toRunnable(task));
return;
}

final var builder = new DirectTaskResultBuilder(streamProcessorContext);
final var logStreamBatchWriter = streamProcessorContext.getLogStreamBatchWriter();
final var builder =
new DirectTaskResultBuilder(
streamProcessorContext, logStreamBatchWriter::canWriteAdditionalEvent);
final var result = task.execute(builder);

// we need to retry the writing if the dispatcher return zero or negative position (this means
Expand All @@ -111,8 +113,19 @@ Runnable toRunnable(final Task task) {
writeRetryStrategy.runWithRetry(
() -> {
Loggers.PROCESS_PROCESSOR_LOGGER.trace("Write scheduled TaskResult to dispatcher!");
return result.writeRecordsToStream(streamProcessorContext.getLogStreamBatchWriter())
>= 0;
result
.getRecordBatch()
.forEach(
entry ->
logStreamBatchWriter
.event()
.key(entry.key())
.metadataWriter(entry.recordMetadata())
.sourceIndex(entry.sourceIndex())
.valueWriter(entry.recordValue())
.done());

return logStreamBatchWriter.tryWrite() >= 0;
},
streamProcessorContext.getAbortCondition());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.mockito.Mockito.verify;

import io.camunda.zeebe.engine.api.EmptyProcessingResult;
import io.camunda.zeebe.engine.api.EmptyTaskResult;
import io.camunda.zeebe.engine.api.ProcessingResult;
import io.camunda.zeebe.engine.api.ProcessingResultBuilder;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
Expand All @@ -29,6 +28,7 @@
import io.camunda.zeebe.engine.api.TaskResult;
import io.camunda.zeebe.engine.api.TaskResultBuilder;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.api.records.RecordBatch;
import io.camunda.zeebe.engine.util.Records;
import io.camunda.zeebe.engine.util.StreamPlatform;
import io.camunda.zeebe.engine.util.StreamPlatformExtension;
Expand Down Expand Up @@ -219,7 +219,7 @@ public void shouldScheduleOnFixedRate() {
private static final class DummyTask implements Task {
@Override
public TaskResult execute(final TaskResultBuilder taskResultBuilder) {
return EmptyTaskResult.INSTANCE;
return RecordBatch::empty;
}
}

Expand Down

0 comments on commit 95d5a16

Please sign in to comment.