Skip to content

Commit

Permalink
merge: #9741
Browse files Browse the repository at this point in the history
9741: Mark in processing via flag r=Zelldon a=Zelldon

## Description

Previously the processing has been marked implicit via the currentProcessor
in order to move the processing call out to the engine, we need to mark
the processing more explicit.

<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

as preparation for #9725 



Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and Zelldon committed Jul 8, 2022
2 parents c578bc9 + c9e84da commit 9e253c9
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public final class ProcessingStateMachine {
// Used for processing duration metrics
private Histogram.Timer processingTimer;
private boolean reachedEnd = true;
private boolean inProcessing;

public ProcessingStateMachine(
final ProcessingContext context, final BooleanSupplier shouldProcessNext) {
Expand Down Expand Up @@ -217,7 +218,7 @@ private void tryToReadNextRecord() {
&& lastWrittenPosition <= previousRecord.getPosition();
}

if (shouldProcessNext.getAsBoolean() && hasNext && currentProcessor == null) {
if (shouldProcessNext.getAsBoolean() && hasNext && !inProcessing) {
currentRecord = logStreamReader.next();

if (eventFilter.applies(currentRecord)) {
Expand All @@ -240,6 +241,10 @@ public boolean hasReachedEnd() {
}

private void processCommand(final LoggedEvent command) {
// we have to mark ourself has inProcessing to not interfere with readNext calls, which
// are triggered from commit listener
inProcessing = true;

metadata.reset();
command.readMetadata(metadata);

Expand Down Expand Up @@ -462,7 +467,7 @@ private void executeSideEffects() {
processingTimer.close();

// continue with next record
currentProcessor = null;
inProcessing = false;
actor.submit(this::readNextRecord);
});
}
Expand Down

0 comments on commit 9e253c9

Please sign in to comment.