Skip to content

Commit

Permalink
merge: #9377
Browse files Browse the repository at this point in the history
9377: [Backport stable/1.3] Do not open dispatcher on follower role r=deepthidevaki a=deepthidevaki

## Description

Backport of #9367

related #8509

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and deepthidevaki committed May 16, 2022
2 parents 9420cc1 + ce866d3 commit 9b8dcfd
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriterImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
Expand All @@ -37,6 +37,41 @@
import java.util.function.Function;
import org.slf4j.Logger;

/*
+-------------------+
| |
| ActorStarting |
| |
+-------------------+
|
v
+-------------------+
| | +-----------------+
| Create Reader | | |
| | ---------------------------> | Actor close | <-------------------------------
+-------------------+ | | | | |
| | | +-----------------+ |
v | | |
+-------------------+ | | |
| | | +-----------+ +-------------+ +-----------------+ +------------+
| Actor Started |-------- | | | | | | | |
| |----------->| Replay |------->| Replay |------->| Create writer | ---->| Process |
+-------------------+ | | | Completed | | | | |
+-----------+ +-------------+ +-----------------+ +------------+
| | |
| | |
| | |
v | |
+-------------+ | |
| Actor | | |
| Failed | <---------------------------------------------------------
| |
+-------------+
https://textik.com/#f8692d3c3e76c699
*/
public class StreamProcessor extends Actor implements HealthMonitorable, LogRecordAwaiter {

public static final long UNSET_POSITION = -1L;
Expand All @@ -62,7 +97,6 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private final TypedRecordProcessorFactory typedRecordProcessorFactory;
private final String actorName;
private LogStreamReader logStreamReader;
private long snapshotPosition = -1L;
private ProcessingStateMachine processingStateMachine;
private ReplayStateMachine replayStateMachine;

Expand All @@ -73,12 +107,14 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private volatile long lastTickTime;
private boolean shouldProcess = true;
private ActorFuture<LastProcessingPositions> replayCompletedFuture;
private final Function<LogStreamBatchWriter, TypedStreamWriter> typedStreamWriterFactory;

protected StreamProcessor(final StreamProcessorBuilder processorBuilder) {
actorSchedulingService = processorBuilder.getActorSchedulingService();
lifecycleAwareListeners = processorBuilder.getLifecycleListeners();

typedRecordProcessorFactory = processorBuilder.getTypedRecordProcessorFactory();
typedStreamWriterFactory = processorBuilder.getTypedStreamWriterFactory();
zeebeDb = processorBuilder.getZeebeDb();
eventApplierFactory = processorBuilder.getEventApplierFactory();

Expand Down Expand Up @@ -113,21 +149,18 @@ public String getName() {
@Override
protected void onActorStarting() {
actor.runOnCompletionBlockingCurrentPhase(
logStream.newLogStreamBatchWriter(), this::onRetrievingWriter);
logStream.newLogStreamReader(), this::onRetrievingReader);
}

@Override
protected void onActorStarted() {
try {
LOG.debug("Recovering state of partition {} from snapshot", partitionId);
final var startRecoveryTimer = metrics.startRecoveryTimer();
snapshotPosition = recoverFromSnapshot();
final long snapshotPosition = recoverFromSnapshot();

initProcessors();

processingStateMachine =
new ProcessingStateMachine(processingContext, this::shouldProcessNext);

healthCheckTick();

replayStateMachine = new ReplayStateMachine(processingContext, this::shouldProcessNext);
Expand Down Expand Up @@ -227,19 +260,33 @@ private void healthCheckTick() {
}

private void onRetrievingWriter(
final LogStreamBatchWriter batchWriter, final Throwable errorOnReceivingWriter) {
final LogStreamBatchWriter batchWriter,
final Throwable errorOnReceivingWriter,
final LastProcessingPositions lastProcessingPositions) {

if (errorOnReceivingWriter == null) {
processingContext
.maxFragmentSize(batchWriter.getMaxFragmentLength())
.logStreamWriter(new TypedStreamWriterImpl(batchWriter));
.logStreamWriter(typedStreamWriterFactory.apply(batchWriter));

phase = Phase.PROCESSING;

// enable writing records to the stream
processingContext.enableLogStreamWriter();

processingStateMachine =
new ProcessingStateMachine(processingContext, this::shouldProcessNext);

actor.runOnCompletionBlockingCurrentPhase(
logStream.newLogStreamReader(), this::onRetrievingReader);
logStream.registerRecordAvailableListener(this);

// start reading
lifecycleAwareListeners.forEach(l -> l.onRecovered(processingContext));
processingStateMachine.startProcessing(lastProcessingPositions);
if (!shouldProcess) {
setStateToPausedAndNotifyListeners();
}
} else {
LOG.error(
"Unexpected error on retrieving batch writer from log stream.", errorOnReceivingWriter);
actor.close();
onFailure(errorOnReceivingWriter);
}
}

Expand Down Expand Up @@ -306,19 +353,11 @@ private ZeebeDbState recoverState() {
}

private void onRecovered(final LastProcessingPositions lastProcessingPositions) {
phase = Phase.PROCESSING;

// enable writing records to the stream
processingContext.enableLogStreamWriter();

logStream.registerRecordAvailableListener(this);

// start reading
lifecycleAwareListeners.forEach(l -> l.onRecovered(processingContext));
processingStateMachine.startProcessing(lastProcessingPositions);
if (!shouldProcess) {
setStateToPausedAndNotifyListeners();
}
logStream
.newLogStreamBatchWriter()
.onComplete(
(batchWriter, errorOnReceivingWriter) ->
onRetrievingWriter(batchWriter, errorOnReceivingWriter, lastProcessingPositions));
}

private void onFailure(final Throwable throwable) {
Expand All @@ -330,10 +369,10 @@ private void onFailure(final Throwable throwable) {

if (throwable instanceof UnrecoverableException) {
final var report = HealthReport.dead(this).withIssue(throwable);
failureListeners.forEach((l) -> l.onUnrecoverableFailure(report));
failureListeners.forEach(l -> l.onUnrecoverableFailure(report));
} else {
final var report = HealthReport.unhealthy(this).withIssue(throwable);
failureListeners.forEach((l) -> l.onFailure(report));
failureListeners.forEach(l -> l.onFailure(report));
}
}

Expand All @@ -354,6 +393,9 @@ public ActorFuture<Long> getLastProcessedPositionAsync() {
() -> {
if (isInReplayOnlyMode()) {
return replayStateMachine.getLastSourceEventPosition();
} else if (processingStateMachine == null) {
// StreamProcessor is still replay mode
return StreamProcessor.UNSET_POSITION;
} else {
return processingStateMachine.getLastSuccessfulProcessedRecordPosition();
}
Expand All @@ -369,6 +411,9 @@ public ActorFuture<Long> getLastWrittenPositionAsync() {
() -> {
if (isInReplayOnlyMode()) {
return replayStateMachine.getLastReplayedEventPosition();
} else if (processingStateMachine == null) {
// StreamProcessor is still replay mode
return StreamProcessor.UNSET_POSITION;
} else {
return processingStateMachine.getLastWrittenPosition();
}
Expand All @@ -381,7 +426,7 @@ public HealthReport getHealthReport() {
return HealthReport.unhealthy(this).withMessage("actor is closed");
}

if (processingStateMachine == null || !processingStateMachine.isMakingProgress()) {
if (processingStateMachine != null && !processingStateMachine.isMakingProgress()) {
return HealthReport.unhealthy(this).withMessage("not making progress");
}

Expand Down Expand Up @@ -453,7 +498,9 @@ public void resumeProcessing() {
// since the listeners are not recovered yet
lifecycleAwareListeners.forEach(StreamProcessorLifecycleAware::onResumed);
phase = Phase.PROCESSING;
actor.submit(processingStateMachine::readNextRecord);
if (processingStateMachine != null) {
actor.submit(processingStateMachine::readNextRecord);
}
LOG.debug("Resumed processing for partition {}", partitionId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@

import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriterImpl;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -27,6 +30,8 @@ public final class StreamProcessorBuilder {
private ZeebeDb zeebeDb;
private Function<MutableZeebeState, EventApplier> eventApplierFactory;
private int nodeId;
private Function<LogStreamBatchWriter, TypedStreamWriter> typedStreamWriterFactory =
TypedStreamWriterImpl::new;

public StreamProcessorBuilder() {
processingContext = new ProcessingContext();
Expand Down Expand Up @@ -124,4 +129,14 @@ private void validate() {
Objects.requireNonNull(zeebeDb, "No database provided.");
Objects.requireNonNull(eventApplierFactory, "No factory for the event supplier provided.");
}

public Function<LogStreamBatchWriter, TypedStreamWriter> getTypedStreamWriterFactory() {
return typedStreamWriterFactory;
}

public StreamProcessorBuilder typedStreamWriterFactory(
final Function<LogStreamBatchWriter, TypedStreamWriter> typedStreamWriterFactory) {
this.typedStreamWriterFactory = typedStreamWriterFactory;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ private StreamProcessor getErrorProneStreamProcessor() {
streamProcessorRule.startTypedStreamProcessorNotAwaitOpening(
processingContext -> {
final MutableZeebeState zeebeState = processingContext.getZeebeState();
mockedLogStreamWriter = new WrappedStreamWriter();
processingContext.logStreamWriter(mockedLogStreamWriter);
return processors(zeebeState.getKeyGenerator(), processingContext.getWriters())
.onCommand(
ValueType.PROCESS_INSTANCE,
Expand All @@ -177,7 +175,8 @@ public void processRecord(
}
}
});
});
},
batchWriter -> new WrappedStreamWriter());

return streamProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.immutable.LastProcessedPositionState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import java.util.function.Function;

public class StreamProcessingComposite {

Expand Down Expand Up @@ -100,6 +103,21 @@ public StreamProcessor startTypedStreamProcessorNotAwaitOpening(
}));
}

public StreamProcessor startTypedStreamProcessorNotAwaitOpening(
final int partitionId,
final TypedRecordProcessorFactory factory,
final Function<LogStreamBatchWriter, TypedStreamWriter> streamWriterFactory) {
return streams.startStreamProcessorNotAwaitOpening(
getLogName(partitionId),
zeebeDbFactory,
(processingContext -> {
zeebeState = processingContext.getZeebeState();
lastProcessedPositionState = processingContext.getLastProcessedPositionState();
return factory.createProcessors(processingContext);
}),
streamWriterFactory);
}

public void pauseProcessing(final int partitionId) {
streams.pauseProcessing(getLogName(partitionId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.util.StreamProcessingComposite.StreamProcessorTestFactory;
import io.camunda.zeebe.engine.util.TestStreams.FluentLogWriter;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.msgpack.UnpackedObject;
Expand Down Expand Up @@ -151,6 +153,13 @@ public StreamProcessor startTypedStreamProcessorNotAwaitOpening(
startPartitionId, factory);
}

public StreamProcessor startTypedStreamProcessorNotAwaitOpening(
final TypedRecordProcessorFactory processorFactory,
final Function<LogStreamBatchWriter, TypedStreamWriter> streamWriterFactory) {
return streamProcessingComposite.startTypedStreamProcessorNotAwaitOpening(
startPartitionId, processorFactory, streamWriterFactory);
}

public StreamProcessor startTypedStreamProcessor(
final int partitionId, final TypedRecordProcessorFactory factory) {
return streamProcessingComposite.startTypedStreamProcessor(partitionId, factory);
Expand Down

0 comments on commit 9b8dcfd

Please sign in to comment.