Skip to content

Commit

Permalink
merge: #10191
Browse files Browse the repository at this point in the history
10191: Handle append error differently based on ResultBuilder r=Zelldon a=Zelldon

## Description

:x: Blocked by #10188 

This PR adds Either to the RecordBatch as return value. This allows to handle the error case differently. For example in the Processing right now we want to throw an exception, but later this can also handled differently which is why we added here a separate Method on the ProcessingResultBuilder. This is discussable whether we already want that.

What we need and want is to being able to handle failed appends in the consumers of the ProcessingScheduleService. Means at the TaskResultBuilder we want to now whether it was successful or not and just stop to append more records. This is slightly similar to how we did it before. 

This fixes a critical bug #10147

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #10147
related #10001 



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Aug 26, 2022
2 parents ab0a90b + fa44e25 commit 8bc7e82
Show file tree
Hide file tree
Showing 14 changed files with 149 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.Either;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -52,7 +53,7 @@ static class MockProcessingResultBuilder implements ProcessingResultBuilder {
final List<Event> followupRecords = new ArrayList<>();

@Override
public ProcessingResultBuilder appendRecord(
public Either<RuntimeException, ProcessingResultBuilder> appendRecordReturnEither(
final long key,
final RecordType type,
final Intent intent,
Expand All @@ -62,7 +63,7 @@ public ProcessingResultBuilder appendRecord(

final var record = new Event(intent, type, rejectionType, rejectionReason, key, value);
followupRecords.add(record);
return null;
return Either.right(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.Either;

/** Builder to compose the processing result */
public interface ProcessingResultBuilder {
Expand All @@ -21,8 +22,38 @@ public interface ProcessingResultBuilder {
* Appends a record to the result
*
* @return returns itself for method chaining
* @throws RuntimeException if to appended record doesn't fit into the RecordBatch
*/
ProcessingResultBuilder appendRecord(
default ProcessingResultBuilder appendRecord(
final long key,
final RecordType type,
final Intent intent,
final RejectionType rejectionType,
final String rejectionReason,
final RecordValue value)
throws RuntimeException {
final var either =
appendRecordReturnEither(key, type, intent, rejectionType, rejectionReason, value);

if (either.isLeft()) {
// This is how we handled too big record batches as well, except that this is now a
// different place. Before an exception was raised during the writing, now it is during
// processing. Both will lead to the onError call, such that the RecordProcessors can handle
// this case.
throw either.getLeft();
}
return either.get();
}

/**
* Appends a record to the result, returns an {@link Either<RuntimeException,
* ProcessingResultBuilder>} which indicates whether the appending was successful or not. This is
* useful in case were potentially we could reach the record batch limit size. The return either
* allows to handle such error case gracefully.
*
* @return returns either a failure or itself for chaining
*/
Either<RuntimeException, ProcessingResultBuilder> appendRecordReturnEither(
final long key,
final RecordType type,
final Intent intent,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ public interface TaskResultBuilder {
/**
* Appends a record to the result
*
* @return returns itself for method chaining
* @return returns true if the record still fits into the result, false otherwise
*/
TaskResultBuilder appendCommandRecord(
final long key, final Intent intent, final UnifiedRecordValue value);
boolean appendCommandRecord(final long key, final Intent intent, final UnifiedRecordValue value);

TaskResult build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferWriter;

/**
Expand All @@ -32,8 +33,9 @@ public interface MutableRecordBatch extends ImmutableRecordBatch {
* @param rejectionReason the rejection reason, part of the record metadata, can be empty
* @param valueType the value type, part of the record metadata, must be set
* @param valueWriter the actual record value
* @return either a failure if record can't be added to the batch or null on success
*/
void appendRecord(
Either<RuntimeException, Void> appendRecord(
final long key,
final int sourceIndex,
final RecordType recordType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.buffer.BufferWriter;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -33,7 +34,7 @@ public static ImmutableRecordBatch empty() {
}

@Override
public void appendRecord(
public Either<RuntimeException, Void> appendRecord(
final long key,
final int sourceIndex,
final RecordType recordType,
Expand All @@ -55,22 +56,23 @@ public void appendRecord(
final var entryLength = recordBatchEntry.getLength();

if (!recordBatchSizePredicate.test(recordBatchEntries.size() + 1, batchSize + entryLength)) {
// todo decided whether we want to throw or return a bool or a either?!
throw new IllegalStateException(
"Can't append entry: '"
+ recordBatchEntry
+ "' with size: "
+ entryLength
+ " this would exceed the maximum batch size."
+ " [ currentBatchEntryCount: "
+ recordBatchEntries.size()
+ ", currentBatchSize: "
+ batchSize
+ "]");
return Either.left(
new IllegalStateException(
"Can't append entry: '"
+ recordBatchEntry
+ "' with size: "
+ entryLength
+ " this would exceed the maximum batch size."
+ " [ currentBatchEntryCount: "
+ recordBatchEntries.size()
+ ", currentBatchSize: "
+ batchSize
+ "]"));
}

recordBatchEntries.add(recordBatchEntry);
batchSize += entryLength;
return Either.right(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ public JobBackoffChecker(final JobState jobState) {
taskResultBuilder ->
jobState.findBackedOffJobs(
ActorClock.currentTimeMillis(),
(key, record) -> {
taskResultBuilder.appendCommandRecord(
key, JobIntent.RECUR_AFTER_BACKOFF, record);

return true;
}));
(key, record) ->
taskResultBuilder.appendCommandRecord(
key, JobIntent.RECUR_AFTER_BACKOFF, record)));
}

public void scheduleBackOff(final long dueDate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,7 @@ public TaskResult execute(final TaskResultBuilder taskResultBuilder) {
final long now = currentTimeMillis();
state.forEachTimedOutEntry(
now,
(key, record) -> {
taskResultBuilder.appendCommandRecord(key, JobIntent.TIME_OUT, record);
return true;
});
(key, record) -> taskResultBuilder.appendCommandRecord(key, JobIntent.TIME_OUT, record));
if (shouldReschedule) {
scheduleDeactivateTimedOutJobsTask();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ private boolean writeDeleteMessageCommand(
deleteMessageCommand.setMessageId(message.getMessageIdBuffer());
}

taskResultBuilder.appendCommandRecord(
return taskResultBuilder.appendCommandRecord(
storedMessage.getMessageKey(), MessageIntent.EXPIRE, deleteMessageCommand);
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,8 @@ public boolean visit(final TimerInstance timer) {
.setRepetitions(timer.getRepetitions())
.setProcessDefinitionKey(timer.getProcessDefinitionKey());

taskResultBuilder.appendCommandRecord(timer.getKey(), TimerIntent.TRIGGER, timerRecord);

return true;
return taskResultBuilder.appendCommandRecord(
timer.getKey(), TimerIntent.TRIGGER, timerRecord);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.Either;
import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -56,7 +57,7 @@ final class DirectProcessingResultBuilder implements ProcessingResultBuilder {
}

@Override
public ProcessingResultBuilder appendRecord(
public Either<RuntimeException, ProcessingResultBuilder> appendRecordReturnEither(
final long key,
final RecordType type,
final Intent intent,
Expand All @@ -71,15 +72,19 @@ public ProcessingResultBuilder appendRecord(
}

if (value instanceof UnifiedRecordValue unifiedRecordValue) {
mutableRecordBatch.appendRecord(
key, -1, type, intent, rejectionType, rejectionReason, valueType, unifiedRecordValue);
final var either =
mutableRecordBatch.appendRecord(
key, -1, type, intent, rejectionType, rejectionReason, valueType, unifiedRecordValue);
if (either.isLeft()) {
return Either.left(either.getLeft());
}
} else {
throw new IllegalStateException(
String.format("The record value %s is not a UnifiedRecordValue", value));
}

streamWriter.appendRecord(key, type, intent, rejectionType, rejectionReason, value);
return this;
return Either.right(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ final class DirectTaskResultBuilder implements TaskResultBuilder {
}

@Override
public DirectTaskResultBuilder appendCommandRecord(
public boolean appendCommandRecord(
final long key, final Intent intent, final UnifiedRecordValue value) {

final ValueType valueType = TYPE_REGISTRY.get(value.getClass());
Expand All @@ -48,9 +48,11 @@ public DirectTaskResultBuilder appendCommandRecord(
throw new IllegalStateException("Missing value type mapping for record: " + value.getClass());
}

mutableRecordBatch.appendRecord(
key, -1, RecordType.COMMAND, intent, RejectionType.NULL_VAL, "", valueType, value);
return this;
final var either =
mutableRecordBatch.appendRecord(
key, -1, RecordType.COMMAND, intent, RejectionType.NULL_VAL, "", valueType, value);

return either.isRight();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ public void shouldWriteRecordAfterTaskWasExecuted() {
// when
dummyProcessorSpy.scheduleService.runDelayed(
Duration.ZERO,
(builder) -> builder.appendCommandRecord(1, ACTIVATE_ELEMENT, RECORD).build());
(builder) -> {
builder.appendCommandRecord(1, ACTIVATE_ELEMENT, RECORD);
return builder.build();
});

// then
verify(dummyProcessorSpy, TIMEOUT)
Expand All @@ -209,7 +212,10 @@ public void shouldScheduleOnFixedRate() {
// when
dummyProcessorSpy.scheduleService.runAtFixedRate(
Duration.ofMillis(100),
(builder) -> builder.appendCommandRecord(1, ACTIVATE_ELEMENT, RECORD).build());
(builder) -> {
builder.appendCommandRecord(1, ACTIVATE_ELEMENT, RECORD);
return builder.build();
});

// then
verify(dummyProcessorSpy, TIMEOUT.times(5))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -46,6 +47,7 @@ void shouldAbortIterationAndGiveYieldAfterSomeTimeHasPassed() {

// given
final var mockTaskResultBuilder = mock(TaskResultBuilder.class);
when(mockTaskResultBuilder.appendCommandRecord(anyLong(), any(), any())).thenReturn(true);

final var mockTimer = mock(TimerInstance.class, Mockito.RETURNS_DEEP_STUBS);
final var timerKey = 42L;
Expand Down Expand Up @@ -75,6 +77,38 @@ void shouldAbortIterationAndGiveYieldAfterSomeTimeHasPassed() {
* So in the fifth iteration, the mechanism will yield
*/
}

@Test
void shouldAbortIterationWhenRecordBatchReturnsFalseOnAppend() {
/* This test verifies that the class will yield at some point, and will not add endless records
* to a batch.
*/

// given
final var mockTaskResultBuilder = mock(TaskResultBuilder.class);
when(mockTaskResultBuilder.appendCommandRecord(anyLong(), any(), any()))
.thenReturn(true)
.thenReturn(false);

final var mockTimer = mock(TimerInstance.class, Mockito.RETURNS_DEEP_STUBS);
final var timerKey = 42L;
when(mockTimer.getKey()).thenReturn(timerKey);

final var testActorClock = new TestActorClock();

final var testTimerInstanceState =
new TestTimerInstanceStateThatSimulatesAnEndlessListOfDueTimers(
mockTimer, testActorClock);

final var sut = new TriggerTimersSideEffect(testTimerInstanceState, testActorClock, true);

// when
sut.apply(mockTaskResultBuilder);

// then
verify(mockTaskResultBuilder, times(2))
.appendCommandRecord(eq(timerKey), eq(TimerIntent.TRIGGER), any());
}
}

@Nested
Expand Down

0 comments on commit 8bc7e82

Please sign in to comment.