Skip to content

Commit

Permalink
merge: #10118
Browse files Browse the repository at this point in the history
10118: Add new RecordBatch classes and interfaces r=Zelldon a=Zelldon

## Description

With the #9600 and especially with #9724 we wanted to create a way to return results (records) on processing and on scheduled Tasks, without the need to use the LogStreamWriters. This should allow us to reduce the dependency on the log stream and in general, make the test setup more easier. 

We did the first attempt with the [BufferedProcessingResultBuilder](https://github.com/camunda/zeebe/blob/main/engine/src/main/java/io/camunda/zeebe/streamprocessor/BufferedProcessingResultBuilder.java), which only contains a buffer to write into the needed record details. This was also described as a solution in the issue #9724. 

During the discussion of the proposal, there was already the idea raised that it would be nicer to have a List of records returned by the Processing- and TaskResult. This was added as a Bonus in #9724.

Today I thought longer about the `BufferedProcessingResultBuilder` and how we could test it and in general make it nicer to return the processing result. Checking the code of the result builder and [LogStreamBatchWriterImpl.java](https://github.com/camunda/zeebe/blob/main/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamBatchWriterImpl.java), I realized that we do not really need to copy the record details into a big buffer (as we do it [here](https://github.com/camunda/zeebe/blob/main/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamBatchWriterImpl.java#L163-L195)). We could create a "struct" (some would call it a java record but to avoid confusion I call it struct now) of the record details which we want to write. Most parts of the struct are primitives (like enums intent, valueType etc.) and the RecordValue would be only thing we would need to copy into a buffer. This would allow better debuggability (e.g. we can see during processing what is already part of the result) and no need to mess around with offsets in the implementation. We can keep references to these "structs" in a list and iterate over it in the StreamProcessor to write the record details, similar to what we [do in the LogStreamBatchWriter here](https://github.com/camunda/zeebe/blob/main/logstreams/src/main/java/io/camunda/zeebe/logstreams/impl/log/LogStreamBatchWriterImpl.java#L240). 

One interesting part after writing the classes and tests, I also realized that the batch entry interface could return the `UnifiedRecordValue` in order to not have to mess again with buffers (BufferWriters or BufferReaders) and to make the engine tests easier to implement. For example, if we would like to split the modules we could write tests without the stream processor and take the records from the processing result and give it directly again to the engine 🤯

So this PR adds new interfaces and first implementation for so called RecordBatch and RecordBatchEntry, which should be later used in the Processing- and TaskResult. Since the CommandWriter needs similar information as the LogStreamWriter, we can reuse the RecordBatch for both the records to write and for the response, which is part of the Processing Result. No other code was touched/modified.

Right now the interfaces are kept to a minimum and documentation as well. I guess we might need to iterate over them later again, but I think it is a good step forward. 

Feel free to propose another names for the classes and interfaces, nothing is set into stone. #namingishard

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related to #10001



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 23, 2022
2 parents cf7cd1f + 3fc44ce commit 791c8cf
Show file tree
Hide file tree
Showing 7 changed files with 485 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;

/**
* Represents an unmodifiable batch of records, which extends the {@link Iterable<
* ImmutableRecordBatchEntry >} in order to make sure that the contained entries can be accessed.
*/
public interface ImmutableRecordBatch extends Iterable<ImmutableRecordBatchEntry> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;

import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;

/**
* Represents an unmodifiable entry of an {@link ImmutableRecordBatch}. Contains data about a record
* which has been created by an RecordProcessor.
*/
public interface ImmutableRecordBatchEntry {

/**
* @return the key of the record
*/
long key();

/**
* @return points to a command which is part of the same batch, which caused that entry
*/
int sourceIndex();

/**
* @return meta data of the record, like ValueType, Intent, RecordType etc.
*/
RecordMetadata recordMetadata();

/**
* @return the actual record value, this method returns a general type but can be casted to the
* right record value class if necessary
*/
UnifiedRecordValue recordValue();

/**
* @return the length of the record entry, important for the batch to determine whether it reached
* its maximum size
*/
int getLength();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;

import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.buffer.BufferWriter;

/**
* Represents a modifiable batch of record, which means we can add multiple Records to the batch.
* For further processing the user can iterate of the appended entries and retrieve the needed data.
*/
public interface MutableRecordBatch extends ImmutableRecordBatch {

/**
* Allows to add a new Record to the batch
*
* @param key the key of the record
* @param sourceIndex the position/index in the current batch which caused that entry; should be
* set to -1 if no entry caused it
* @param recordType the type of the record, part of the record metadata, must be set
* @param intent the intent of the record, part of the record metadata, must be set
* @param rejectionType the rejection type, part of the record metadata, can be set to a
* NULL_VALUE
* @param rejectionReason the rejection reason, part of the record metadata, can be empty
* @param valueType the value type, part of the record metadata, must be set
* @param valueWriter the actual record value
*/
void appendRecord(
final long key,
final int sourceIndex,
final RecordType recordType,
final Intent intent,
final RejectionType rejectionType,
final String rejectionReason,
final ValueType valueType,
final BufferWriter valueWriter);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;

import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterator;
import java.util.function.Consumer;

public final class RecordBatch implements MutableRecordBatch {

final List<ImmutableRecordBatchEntry> recordBatchEntries = new ArrayList<>();
private int batchSize;
private final RecordBatchSizePredicate recordBatchSizePredicate;

public RecordBatch(final RecordBatchSizePredicate recordBatchSizePredicate) {
this.recordBatchSizePredicate = recordBatchSizePredicate;
}

@Override
public void appendRecord(
final long key,
final int sourceIndex,
final RecordType recordType,
final Intent intent,
final RejectionType rejectionType,
final String rejectionReason,
final ValueType valueType,
final BufferWriter valueWriter) {
final var recordBatchEntry =
RecordBatchEntry.createEntry(
key,
sourceIndex,
recordType,
intent,
rejectionType,
rejectionReason,
valueType,
valueWriter);
final var entryLength = recordBatchEntry.getLength();

if (!recordBatchSizePredicate.test(recordBatchEntries.size() + 1, batchSize + entryLength)) {
// todo decided whether we want to throw or return a bool or a either?!
throw new IllegalStateException(
"Can't append entry: '"
+ recordBatchEntry
+ "' with size: "
+ entryLength
+ " this would exceed the maximum batch size."
+ " [ currentBatchEntryCount: "
+ recordBatchEntries.size()
+ ", currentBatchSize: "
+ batchSize
+ "]");
}

recordBatchEntries.add(recordBatchEntry);
batchSize += entryLength;
}

public int getBatchSize() {
return batchSize;
}

@Override
public Iterator<ImmutableRecordBatchEntry> iterator() {
return recordBatchEntries.iterator();
}

@Override
public void forEach(final Consumer<? super ImmutableRecordBatchEntry> action) {
recordBatchEntries.forEach(action);
}

@Override
public Spliterator<ImmutableRecordBatchEntry> spliterator() {
return recordBatchEntries.spliterator();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;

import static io.camunda.zeebe.engine.processing.streamprocessor.TypedEventRegistry.EVENT_REGISTRY;

import io.camunda.zeebe.protocol.impl.record.RecordMetadata;
import io.camunda.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.ReflectUtil;
import io.camunda.zeebe.util.buffer.BufferWriter;
import org.agrona.concurrent.UnsafeBuffer;

public record RecordBatchEntry(
long key, int sourceIndex, RecordMetadata recordMetadata, UnifiedRecordValue unifiedRecordValue)
implements ImmutableRecordBatchEntry {

@Override
public UnifiedRecordValue recordValue() {
return unifiedRecordValue;
}

@Override
public int getLength() {
return Long.BYTES
+ // key
Integer.BYTES
+ // source Index
recordMetadata.getLength()
+ unifiedRecordValue.getLength();
}

public static RecordBatchEntry createEntry(
final long key,
final int sourceIndex,
final RecordType recordType,
final Intent intent,
final RejectionType rejectionType,
final String rejectionReason,
final ValueType valueType,
final BufferWriter valueWriter) {
final var recordMetadata =
new RecordMetadata()
.recordType(recordType)
.intent(intent)
.rejectionType(rejectionType)
.rejectionReason(rejectionReason)
.valueType(valueType);

// we need to copy the value, to make sure that it will not change later
final var bytes = new byte[valueWriter.getLength()];
final var recordValueBuffer = new UnsafeBuffer(bytes);
valueWriter.write(recordValueBuffer, 0);

final UnifiedRecordValue unifiedRecordValue =
ReflectUtil.newInstance(EVENT_REGISTRY.get(recordMetadata.getValueType()));
unifiedRecordValue.wrap(recordValueBuffer, 0, recordValueBuffer.capacity());

return new RecordBatchEntry(key, sourceIndex, recordMetadata, unifiedRecordValue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.streamprocessor.records;

import java.util.function.BiPredicate;

/**
* Takes as argument the potential next batch entry count and the next potential batch size, in
* order to verify whether this next {@link RecordBatchEntry} can be added to the {@link
* RecordBatch}.
*/
public interface RecordBatchSizePredicate extends BiPredicate<Integer, Integer> {}

0 comments on commit 791c8cf

Please sign in to comment.