Skip to content

Commit

Permalink
merge: #10181
Browse files Browse the repository at this point in the history
10181: Add canWriteRecord method to RecordBatch r=Zelldon a=Zelldon

## Description

This method can be used to verify whether a record length could potentially be appended to the batch, this is useful if you don't want to append the record right now, like if you add more and more data and always check whether it still fits. This is used in the JobBatchCollector for example.

Removed the maxMessageLength, since this was only used in one place and I felt is somehow breaks the abstraction, especially it would block us to really remove the write from the implementation.
<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

related #10001 



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 26, 2022
2 parents 95d5a16 + d8a7917 commit ab0a90b
Show file tree
Hide file tree
Showing 14 changed files with 107 additions and 60 deletions.
Expand Up @@ -103,10 +103,5 @@ public ProcessingResult build() {
public boolean canWriteEventOfLength(final int eventLength) {
return false;
}

@Override
public int getMaxEventLength() {
return 0;
}
}
}
Expand Up @@ -72,6 +72,4 @@ ProcessingResultBuilder withResponse(
ProcessingResult build();

boolean canWriteEventOfLength(int eventLength);

int getMaxEventLength();
}
Expand Up @@ -42,4 +42,14 @@ void appendRecord(
final String rejectionReason,
final ValueType valueType,
final BufferWriter valueWriter);

/**
* Allows to verify whether the given record length is suitable to be appended in the current
* batch. This method is useful if you have one record which will be updated and you don't want to
* append it right now, just to verify whether it would still fit.
*
* @param recordLength the expected record length, which needs to be verified
* @return true if the record length would fit into the batch, false otherwise
*/
boolean canAppendRecordOfLength(int recordLength);
}
Expand Up @@ -73,6 +73,11 @@ public void appendRecord(
batchSize += entryLength;
}

@Override
public boolean canAppendRecordOfLength(final int recordLength) {
return recordBatchSizePredicate.test(recordBatchEntries.size() + 1, batchSize + recordLength);
}

public int getBatchSize() {
return batchSize;
}
Expand Down
Expand Up @@ -79,7 +79,9 @@ private void activateJobs(final TypedRecord<JobBatchRecord> record) {
final Either<TooLargeJob, Integer> result = jobBatchCollector.collectJobs(record);
final var activatedJobCount = result.getOrElse(0);
result.ifLeft(
largeJob -> raiseIncidentJobTooLargeForMessageSize(largeJob.key(), largeJob.record()));
largeJob ->
raiseIncidentJobTooLargeForMessageSize(
largeJob.key(), largeJob.jobRecord(), largeJob.expectedEventLength()));

activateJobBatch(record, value, jobBatchKey, activatedJobCount);
}
Expand Down Expand Up @@ -125,14 +127,15 @@ private void activateJobBatch(
jobMetrics.jobActivated(value.getType(), activatedCount);
}

private void raiseIncidentJobTooLargeForMessageSize(final long jobKey, final JobRecord job) {
final String messageSize = ByteValue.prettyPrint(stateWriter.getMaxEventLength());
private void raiseIncidentJobTooLargeForMessageSize(
final long jobKey, final JobRecord job, final int expectedJobRecordSize) {
final String jobSize = ByteValue.prettyPrint(expectedJobRecordSize);
final DirectBuffer incidentMessage =
wrapString(
String.format(
"The job with key '%s' can not be activated because it is larger than the configured message size (%s). "
"The job with key '%s' can not be activated, because with %s it is larger than the configured message size (per default is 4 MB). "
+ "Try to reduce the size by reducing the number of fetched variables or modifying the variable values.",
jobKey, messageSize));
jobKey, jobSize));
final var incidentEvent =
new IncidentRecord()
.setErrorType(ErrorType.MESSAGE_SIZE_EXCEEDED)
Expand Down
Expand Up @@ -99,7 +99,7 @@ Either<TooLargeJob, Integer> collectJobs(final TypedRecord<JobBatchRecord> recor
// if no jobs were activated, then the current job is simply too large, and we cannot
// activate it
if (activatedCount.value == 0) {
unwritableJob.set(new TooLargeJob(key, jobRecord));
unwritableJob.set(new TooLargeJob(key, jobRecord, expectedEventLength));
}

value.setTruncated(true);
Expand Down Expand Up @@ -165,5 +165,5 @@ private DirectBuffer collectVariables(
return variables;
}

record TooLargeJob(long key, JobRecord record) {}
record TooLargeJob(long key, JobRecord jobRecord, int expectedEventLength) {}
}
Expand Up @@ -45,9 +45,4 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor
public boolean canWriteEventOfLength(final int eventLength) {
return resultBuilder().canWriteEventOfLength(eventLength);
}

@Override
public int getMaxEventLength() {
return resultBuilder().getMaxEventLength();
}
}
Expand Up @@ -33,12 +33,5 @@ public interface TypedEventWriter {
* @param eventLength the length of the event that will be written
* @return true if an event of length {@code eventLength} can be written
*/
default boolean canWriteEventOfLength(final int eventLength) {
return eventLength <= getMaxEventLength();
}

/**
* @return the maximum event length
*/
int getMaxEventLength();
boolean canWriteEventOfLength(final int eventLength);
}
Expand Up @@ -135,11 +135,6 @@ public ProcessingResult build() {

@Override
public boolean canWriteEventOfLength(final int eventLength) {
return streamWriter.canWriteEventOfLength(eventLength);
}

@Override
public int getMaxEventLength() {
return streamWriter.getMaxEventLength();
return mutableRecordBatch.canAppendRecordOfLength(eventLength);
}
}
Expand Up @@ -123,16 +123,4 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor
public boolean canWriteEventOfLength(final int eventLength) {
return batchWriter.canWriteAdditionalEvent(eventLength);
}

/**
* This is not actually accurate, as the frame length needs to also be aligned by the same amount
* of bytes as the batch. However, this would break concerns here, i.e. the writer here would have
* to become Dispatcher aware.
*
* @return an approximate value of the max fragment length
*/
@Override
public int getMaxEventLength() {
return batchWriter.getMaxFragmentLength();
}
}
Expand Up @@ -166,8 +166,8 @@ private final class WrappedStreamWriterLegacy implements LegacyTypedStreamWriter
public void appendFollowUpEvent(final long key, final Intent intent, final RecordValue value) {}

@Override
public int getMaxEventLength() {
return Integer.MAX_VALUE;
public boolean canWriteEventOfLength(final int eventLength) {
return true;
}

@Override
Expand Down
Expand Up @@ -43,9 +43,4 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor
public boolean canWriteEventOfLength(final int eventLength) {
return eventWriter.canWriteEventOfLength(eventLength);
}

@Override
public int getMaxEventLength() {
return eventWriter.getMaxEventLength();
}
}
Expand Up @@ -32,8 +32,8 @@ public void appendFollowUpEvent(final long key, final Intent intent, final Recor
}

@Override
public int getMaxEventLength() {
return Integer.MAX_VALUE;
public boolean canWriteEventOfLength(final int eventLength) {
return true;
}

public static final class RecordedEvent<T extends RecordValue> {
Expand Down
Expand Up @@ -22,10 +22,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.Test;

public class RecordBatchTest {
class RecordBatchTest {

@Test
public void shouldAppendToRecordBatch() {
void shouldAppendToRecordBatch() {
// given
final var recordBatch = new RecordBatch((count, size) -> true);
final var processInstanceRecord = Records.processInstance(1);
Expand Down Expand Up @@ -73,7 +73,7 @@ public void shouldAppendToRecordBatch() {
}

@Test
public void shouldUseRecordSizePredicate() {
void shouldUseRecordSizePredicate() {
// given
final AtomicInteger batchEntryCount = new AtomicInteger(-1);
final AtomicInteger batchSize = new AtomicInteger(-1);
Expand Down Expand Up @@ -107,7 +107,7 @@ public boolean test(final Integer count, final Integer size) {
}

@Test
public void shouldUpdateBatchEntryCountWhenUsingRecordSizePredicate() {
void shouldUpdateBatchEntryCountWhenUsingRecordSizePredicate() {
// given
final AtomicInteger batchEntryCount = new AtomicInteger(-1);
final AtomicInteger batchSize = new AtomicInteger(-1);
Expand Down Expand Up @@ -150,7 +150,7 @@ public boolean test(final Integer count, final Integer size) {
}

@Test
public void shouldNotAppendToRecordBatchIfMaxSizeIsReached() {
void shouldNotAppendToRecordBatchIfMaxSizeIsReached() {
// given
final var recordBatch = new RecordBatch((count, size) -> false);
final var processInstanceRecord = Records.processInstance(1);
Expand All @@ -172,7 +172,7 @@ public void shouldNotAppendToRecordBatchIfMaxSizeIsReached() {
}

@Test
public void shouldOnlyAddUntilMaxBatchSizeIsReached() {
void shouldOnlyAddUntilMaxBatchSizeIsReached() {
// given
final var recordBatch = new RecordBatch((count, size) -> count < 2);
final var processInstanceRecord = Records.processInstance(1);
Expand Down Expand Up @@ -202,4 +202,74 @@ public void shouldOnlyAddUntilMaxBatchSizeIsReached() {
.hasMessageContaining("Can't append entry")
.hasMessageContaining("[ currentBatchEntryCount: 1, currentBatchSize: 249]");
}

@Test
void shouldReturnFalseIfRecordSizeDoesReachSizelimit() {
// given
final var recordBatch = new RecordBatch((count, size) -> size < 100);

// when
final var canAppend = recordBatch.canAppendRecordOfLength(100);

// then
assertThat(canAppend).isFalse();
}

@Test
void shouldReturnTrueIfRecordSizeDoesntReachSizelimit() {
// given
final var recordBatch = new RecordBatch((count, size) -> size < 100);

// when
final var canAppend = recordBatch.canAppendRecordOfLength(99);

// then
assertThat(canAppend).isTrue();
}

@Test
void shouldOnlyReturnTrueUntilMaxBatchSizeIsReached() {
// given
final var recordBatch = new RecordBatch((count, size) -> size < 300);
final var processInstanceRecord = Records.processInstance(1);

recordBatch.appendRecord(
1,
-1,
RecordType.COMMAND,
ProcessInstanceIntent.ACTIVATE_ELEMENT,
RejectionType.ALREADY_EXISTS,
"broken somehow",
ValueType.PROCESS_INSTANCE,
processInstanceRecord);

// when
final var canAppend = recordBatch.canAppendRecordOfLength(recordBatch.getBatchSize());

// then
assertThat(canAppend).isFalse();
}

@Test
void shouldOnlyReturnTrueUntilMaxCountIsReached() {
// given
final var recordBatch = new RecordBatch((count, size) -> count < 2);
final var processInstanceRecord = Records.processInstance(1);

recordBatch.appendRecord(
1,
-1,
RecordType.COMMAND,
ProcessInstanceIntent.ACTIVATE_ELEMENT,
RejectionType.ALREADY_EXISTS,
"broken somehow",
ValueType.PROCESS_INSTANCE,
processInstanceRecord);

// when
final var canAppend = recordBatch.canAppendRecordOfLength(recordBatch.getBatchSize());

// then
assertThat(canAppend).isFalse();
}
}

0 comments on commit ab0a90b

Please sign in to comment.