Skip to content

Commit

Permalink
merge: #9367
Browse files Browse the repository at this point in the history
9367: Do not open dispatchers in follower role r=deepthidevaki a=deepthidevaki

## Description

In follower role, StreamProcessor is running only in replay mode. When the writer is created, dispatcher is also opened which allocates direct buffer. This is unnecessary as the writer is never used. The allocated buffer consumes memory and can create memory pressure on the system. To fix this, we create the writer only after the replay is completed. 

## Related issues

related #8509 



Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
zeebe-bors-camunda[bot] and deepthidevaki committed May 13, 2022
2 parents 836cf12 + 4b93b00 commit 3618c4a
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import io.camunda.zeebe.db.TransactionContext;
import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.metrics.StreamProcessorMetrics;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriterImpl;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.ZeebeDbState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
Expand All @@ -37,6 +37,41 @@
import java.util.function.Function;
import org.slf4j.Logger;

/*
+-------------------+
| |
| ActorStarting |
| |
+-------------------+
|
v
+-------------------+
| | +-----------------+
| Create Reader | | |
| | ---------------------------> | Actor close | <-------------------------------
+-------------------+ | | | | |
| | | +-----------------+ |
v | | |
+-------------------+ | | |
| | | +-----------+ +-------------+ +-----------------+ +------------+
| Actor Started |-------- | | | | | | | |
| |----------->| Replay |------->| Replay |------->| Create writer | ---->| Process |
+-------------------+ | | | Completed | | | | |
+-----------+ +-------------+ +-----------------+ +------------+
| | |
| | |
| | |
v | |
+-------------+ | |
| Actor | | |
| Failed | <---------------------------------------------------------
| |
+-------------+
https://textik.com/#f8692d3c3e76c699
*/
public class StreamProcessor extends Actor implements HealthMonitorable, LogRecordAwaiter {

public static final long UNSET_POSITION = -1L;
Expand All @@ -62,7 +97,6 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private final TypedRecordProcessorFactory typedRecordProcessorFactory;
private final String actorName;
private LogStreamReader logStreamReader;
private long snapshotPosition = -1L;
private ProcessingStateMachine processingStateMachine;
private ReplayStateMachine replayStateMachine;

Expand All @@ -73,12 +107,14 @@ public class StreamProcessor extends Actor implements HealthMonitorable, LogReco
private volatile long lastTickTime;
private boolean shouldProcess = true;
private ActorFuture<LastProcessingPositions> replayCompletedFuture;
private final Function<LogStreamBatchWriter, TypedStreamWriter> typedStreamWriterFactory;

protected StreamProcessor(final StreamProcessorBuilder processorBuilder) {
actorSchedulingService = processorBuilder.getActorSchedulingService();
lifecycleAwareListeners = processorBuilder.getLifecycleListeners();

typedRecordProcessorFactory = processorBuilder.getTypedRecordProcessorFactory();
typedStreamWriterFactory = processorBuilder.getTypedStreamWriterFactory();
zeebeDb = processorBuilder.getZeebeDb();
eventApplierFactory = processorBuilder.getEventApplierFactory();

Expand Down Expand Up @@ -113,21 +149,18 @@ public String getName() {
@Override
protected void onActorStarting() {
actor.runOnCompletionBlockingCurrentPhase(
logStream.newLogStreamBatchWriter(), this::onRetrievingWriter);
logStream.newLogStreamReader(), this::onRetrievingReader);
}

@Override
protected void onActorStarted() {
try {
LOG.debug("Recovering state of partition {} from snapshot", partitionId);
final var startRecoveryTimer = metrics.startRecoveryTimer();
snapshotPosition = recoverFromSnapshot();
final long snapshotPosition = recoverFromSnapshot();

initProcessors();

processingStateMachine =
new ProcessingStateMachine(processingContext, this::shouldProcessNext);

healthCheckTick();

replayStateMachine = new ReplayStateMachine(processingContext, this::shouldProcessNext);
Expand Down Expand Up @@ -225,19 +258,33 @@ private void healthCheckTick() {
}

private void onRetrievingWriter(
final LogStreamBatchWriter batchWriter, final Throwable errorOnReceivingWriter) {
final LogStreamBatchWriter batchWriter,
final Throwable errorOnReceivingWriter,
final LastProcessingPositions lastProcessingPositions) {

if (errorOnReceivingWriter == null) {
processingContext
.maxFragmentSize(batchWriter.getMaxFragmentLength())
.logStreamWriter(new TypedStreamWriterImpl(batchWriter));
.logStreamWriter(typedStreamWriterFactory.apply(batchWriter));

phase = Phase.PROCESSING;

// enable writing records to the stream
processingContext.enableLogStreamWriter();

processingStateMachine =
new ProcessingStateMachine(processingContext, this::shouldProcessNext);

actor.runOnCompletionBlockingCurrentPhase(
logStream.newLogStreamReader(), this::onRetrievingReader);
logStream.registerRecordAvailableListener(this);

// start reading
lifecycleAwareListeners.forEach(l -> l.onRecovered(processingContext));
processingStateMachine.startProcessing(lastProcessingPositions);
if (!shouldProcess) {
setStateToPausedAndNotifyListeners();
}
} else {
LOG.error(
"Unexpected error on retrieving batch writer from log stream.", errorOnReceivingWriter);
actor.close();
onFailure(errorOnReceivingWriter);
}
}

Expand Down Expand Up @@ -304,19 +351,11 @@ private ZeebeDbState recoverState() {
}

private void onRecovered(final LastProcessingPositions lastProcessingPositions) {
phase = Phase.PROCESSING;

// enable writing records to the stream
processingContext.enableLogStreamWriter();

logStream.registerRecordAvailableListener(this);

// start reading
lifecycleAwareListeners.forEach(l -> l.onRecovered(processingContext));
processingStateMachine.startProcessing(lastProcessingPositions);
if (!shouldProcess) {
setStateToPausedAndNotifyListeners();
}
logStream
.newLogStreamBatchWriter()
.onComplete(
(batchWriter, errorOnReceivingWriter) ->
onRetrievingWriter(batchWriter, errorOnReceivingWriter, lastProcessingPositions));
}

private void onFailure(final Throwable throwable) {
Expand All @@ -328,10 +367,10 @@ private void onFailure(final Throwable throwable) {

if (throwable instanceof UnrecoverableException) {
final var report = HealthReport.dead(this).withIssue(throwable);
failureListeners.forEach((l) -> l.onUnrecoverableFailure(report));
failureListeners.forEach(l -> l.onUnrecoverableFailure(report));
} else {
final var report = HealthReport.unhealthy(this).withIssue(throwable);
failureListeners.forEach((l) -> l.onFailure(report));
failureListeners.forEach(l -> l.onFailure(report));
}
}

Expand All @@ -352,6 +391,9 @@ public ActorFuture<Long> getLastProcessedPositionAsync() {
() -> {
if (isInReplayOnlyMode()) {
return replayStateMachine.getLastSourceEventPosition();
} else if (processingStateMachine == null) {
// StreamProcessor is still replay mode
return StreamProcessor.UNSET_POSITION;
} else {
return processingStateMachine.getLastSuccessfulProcessedRecordPosition();
}
Expand All @@ -367,6 +409,9 @@ public ActorFuture<Long> getLastWrittenPositionAsync() {
() -> {
if (isInReplayOnlyMode()) {
return replayStateMachine.getLastReplayedEventPosition();
} else if (processingStateMachine == null) {
// StreamProcessor is still replay mode
return StreamProcessor.UNSET_POSITION;
} else {
return processingStateMachine.getLastWrittenPosition();
}
Expand All @@ -379,7 +424,7 @@ public HealthReport getHealthReport() {
return HealthReport.unhealthy(this).withMessage("actor is closed");
}

if (processingStateMachine == null || !processingStateMachine.isMakingProgress()) {
if (processingStateMachine != null && !processingStateMachine.isMakingProgress()) {
return HealthReport.unhealthy(this).withMessage("not making progress");
}

Expand Down Expand Up @@ -451,7 +496,9 @@ public void resumeProcessing() {
// since the listeners are not recovered yet
lifecycleAwareListeners.forEach(StreamProcessorLifecycleAware::onResumed);
phase = Phase.PROCESSING;
actor.submit(processingStateMachine::readNextRecord);
if (processingStateMachine != null) {
actor.submit(processingStateMachine::readNextRecord);
}
LOG.debug("Resumed processing for partition {}", partitionId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@

import io.camunda.zeebe.db.ZeebeDb;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriterImpl;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -27,6 +30,8 @@ public final class StreamProcessorBuilder {
private ZeebeDb zeebeDb;
private Function<MutableZeebeState, EventApplier> eventApplierFactory;
private int nodeId;
private Function<LogStreamBatchWriter, TypedStreamWriter> typedStreamWriterFactory =
TypedStreamWriterImpl::new;

public StreamProcessorBuilder() {
processingContext = new ProcessingContext();
Expand Down Expand Up @@ -124,4 +129,14 @@ private void validate() {
Objects.requireNonNull(zeebeDb, "No database provided.");
Objects.requireNonNull(eventApplierFactory, "No factory for the event supplier provided.");
}

public Function<LogStreamBatchWriter, TypedStreamWriter> getTypedStreamWriterFactory() {
return typedStreamWriterFactory;
}

public StreamProcessorBuilder typedStreamWriterFactory(
final Function<LogStreamBatchWriter, TypedStreamWriter> typedStreamWriterFactory) {
this.typedStreamWriterFactory = typedStreamWriterFactory;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,6 @@ private StreamProcessor getErrorProneStreamProcessor() {
streamProcessorRule.startTypedStreamProcessorNotAwaitOpening(
processingContext -> {
final MutableZeebeState zeebeState = processingContext.getZeebeState();
mockedLogStreamWriter = new WrappedStreamWriter();
processingContext.logStreamWriter(mockedLogStreamWriter);
return processors(zeebeState.getKeyGenerator(), processingContext.getWriters())
.onCommand(
ValueType.PROCESS_INSTANCE,
Expand All @@ -177,7 +175,8 @@ public void processRecord(
}
}
});
});
},
batchWriter -> new WrappedStreamWriter());

return streamProcessor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.immutable.LastProcessedPositionState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -25,6 +27,7 @@
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.util.concurrent.Callable;
import java.util.function.Function;

public class StreamProcessingComposite {

Expand Down Expand Up @@ -110,6 +113,21 @@ public StreamProcessor startTypedStreamProcessorNotAwaitOpening(
}));
}

public StreamProcessor startTypedStreamProcessorNotAwaitOpening(
final int partitionId,
final TypedRecordProcessorFactory factory,
final Function<LogStreamBatchWriter, TypedStreamWriter> streamWriterFactory) {
return streams.startStreamProcessorNotAwaitOpening(
getLogName(partitionId),
zeebeDbFactory,
(processingContext -> {
zeebeState = processingContext.getZeebeState();
lastProcessedPositionState = processingContext.getLastProcessedPositionState();
return factory.createProcessors(processingContext);
}),
streamWriterFactory);
}

public void pauseProcessing(final int partitionId) {
streams.pauseProcessing(getLogName(partitionId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorMode;
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessorFactory;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.CommandResponseWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedStreamWriter;
import io.camunda.zeebe.engine.state.DefaultZeebeDbFactory;
import io.camunda.zeebe.engine.state.EventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.engine.util.StreamProcessingComposite.StreamProcessorTestFactory;
import io.camunda.zeebe.engine.util.TestStreams.FluentLogWriter;
import io.camunda.zeebe.logstreams.log.LogStreamBatchWriter;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
Expand Down Expand Up @@ -152,6 +154,13 @@ public StreamProcessor startTypedStreamProcessorNotAwaitOpening(
startPartitionId, factory);
}

public StreamProcessor startTypedStreamProcessorNotAwaitOpening(
final TypedRecordProcessorFactory processorFactory,
final Function<LogStreamBatchWriter, TypedStreamWriter> streamWriterFactory) {
return streamProcessingComposite.startTypedStreamProcessorNotAwaitOpening(
startPartitionId, processorFactory, streamWriterFactory);
}

public StreamProcessor startTypedStreamProcessor(
final int partitionId, final TypedRecordProcessorFactory factory) {
return streamProcessingComposite.startTypedStreamProcessor(partitionId, factory);
Expand Down

0 comments on commit 3618c4a

Please sign in to comment.