Skip to content

Commit

Permalink
merge: #9249
Browse files Browse the repository at this point in the history
9249: Yield control if too many timers due r=pihme a=pihme

## Description

Adds a mechanism for the `DueDateTimeChecker` to yield control after some time. This is to stop it from iterating over an unknown number of due timer events and blocking execution while doing so.

Overall, this change should work well in cases where there is a huge backlog of timers. This backlog would then be reduced bit by bit.

The change is potentially bad for cases in which there is a constant and high load with many timers being created all the time. In this case, the change of this PR can lead to due timers continuously growing and the timers triggered will fall more and more behind real time.

Overall, this tradeoff was deemed advantageous. At least it removes that dangers that the iteration blocks the execution for so long that the node is marked as unhealthy. When this situation is reached there is currently no practical recovery possible.

Even before this point is reached, execution will be blocked for long stretches of time, and no progress can be made on that partition. So one faulty process can block all others from executing.

Both issues are addressed by this PR. With this PR it should be always possible to make some progress, albeit small. This would allow users to cancel or change any faulty process, or to reduce the load if needed. 

Further work will be needed to figure out a way how to trigger timers without potentially falling further and further behind real time.

## Review Hints
This PR has duplicate commits from #9237 

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #9238



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed May 2, 2022
2 parents 60d944e + 81c1b3e commit 66a9fe8
Show file tree
Hide file tree
Showing 13 changed files with 445 additions and 36 deletions.
Expand Up @@ -55,6 +55,7 @@
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.snapshots.ConstructableSnapshotStore;
import io.camunda.zeebe.snapshots.impl.FileBasedSnapshotStoreFactory;
import io.camunda.zeebe.util.FeatureFlags;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.camunda.zeebe.util.sched.ConcurrencyControl;
import io.camunda.zeebe.util.startup.StartupStep;
Expand Down Expand Up @@ -112,7 +113,8 @@ final class PartitionFactory {
List<ZeebePartition> constructPartitions(
final RaftPartitionGroup partitionGroup,
final List<PartitionListener> partitionListeners,
final TopologyManager topologyManager) {
final TopologyManager topologyManager,
final FeatureFlags featureFlags) {
final var partitions = new ArrayList<ZeebePartition>();
final var communicationService = clusterServices.getCommunicationService();
final var eventService = clusterServices.getEventService();
Expand All @@ -131,7 +133,8 @@ List<ZeebePartition> constructPartitions(
localBroker,
communicationService,
eventService,
deploymentRequestHandler);
deploymentRequestHandler,
featureFlags);

for (final RaftPartition owningPartition : owningPartitions) {
final var partitionId = owningPartition.id().id();
Expand Down Expand Up @@ -200,7 +203,8 @@ private TypedRecordProcessorsFactory createFactory(
final BrokerInfo localBroker,
final ClusterCommunicationService communicationService,
final ClusterEventService eventService,
final PushDeploymentRequestHandler deploymentRequestHandler) {
final PushDeploymentRequestHandler deploymentRequestHandler,
final FeatureFlags featureFlags) {
return (ProcessingContext processingContext) -> {
final var actor = processingContext.getActor();

Expand Down Expand Up @@ -229,7 +233,8 @@ private TypedRecordProcessorsFactory createFactory(
subscriptionCommandSender,
deploymentDistributor,
deploymentRequestHandler,
jobsAvailableNotification::onJobsAvailable);
jobsAvailableNotification::onJobsAvailable,
featureFlags);

return processor.withListener(
new StreamProcessorLifecycleAware() {
Expand Down
Expand Up @@ -149,7 +149,10 @@ public CompletableFuture<Void> start() {

partitions.addAll(
partitionFactory.constructPartitions(
partitionGroup, partitionListeners, topologyManager));
partitionGroup,
partitionListeners,
topologyManager,
brokerCfg.getExperimental().getFeatures().toFeatureFlags()));

final var futures =
partitions.stream()
Expand Down
Expand Up @@ -30,8 +30,18 @@ public final class FeatureFlagsCfg {
// this.enableFoo = enableFoo;
// }

FeatureFlags toFeatureFlags() {
return new FeatureFlags(/*enableFoo*/ );
private boolean enableYieldingDueDateChecker = DEFAULT_SETTINGS.yieldingDueDateChecker();

public boolean isEnableYieldingDueDateChecker() {
return enableYieldingDueDateChecker;
}

public void setEnableYieldingDueDateChecker(final boolean enableYieldingDueDateChecker) {
this.enableYieldingDueDateChecker = enableYieldingDueDateChecker;
}

public FeatureFlags toFeatureFlags() {
return new FeatureFlags(enableYieldingDueDateChecker /*, enableFoo*/);
}

@Override
Expand Down
@@ -0,0 +1,42 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.system.configuration;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.HashMap;
import java.util.Map;
import org.junit.Test;

public class FeatureFlagsCfgTest {

public final Map<String, String> environment = new HashMap<>();

@Test
public void shouldSetEnableYieldingDueDateCheckerFromConfig() {
// when
final BrokerCfg cfg = TestConfigReader.readConfig("feature-flags-cfg", environment);
final var featureFlagsCfg = cfg.getExperimental().getFeatures();

// then
assertThat(featureFlagsCfg.isEnableYieldingDueDateChecker()).isTrue();
}

@Test
public void shouldSetEnableYieldingDueDateCheckerFromEnv() {
// given
environment.put("zeebe.broker.experimental.features.enableYieldingDueDateChecker", "false");

// when
final BrokerCfg cfg = TestConfigReader.readConfig("feature-flags-cfg", environment);
final var featureFlagsCfg = cfg.getExperimental().getFeatures();

// then
assertThat(featureFlagsCfg.isEnableYieldingDueDateChecker()).isFalse();
}
}
5 changes: 5 additions & 0 deletions broker/src/test/resources/system/feature-flags-cfg.yaml
@@ -0,0 +1,5 @@
zeebe:
broker:
experimental:
features:
enableYieldingDueDateChecker: true
14 changes: 14 additions & 0 deletions dist/src/main/config/broker.standalone.yaml.template
Expand Up @@ -714,3 +714,17 @@
# Enables the query api in the broker.
# This setting can also be set using the environmentvariable ZEEBE_BROKER_EXPERIMENTAL_QUERYAPI_ENABLED
# enabled: false

# Allows to configure feature flags. These are used to test new features in dev and int environments prior
# to rolling them out to production
# features:
# Changes the DueDateTimerChecker to give yield to other processing steps in situations where
# it has many (i.e. millions of) timers to process. If set to false (default) the DueDateTimerChecker will activate all
# due timers. In the worst case, this can lead to the node being blocked for indefinite amount of time,
# being subsequently flagged as unhealthy. Currently, there is no known way to recover from this situation
# If set to true, the DueDateTimerChecker will give yield to other processing steps. This avoids the worst case
# described above. However, under consistent high load it may happen that the activated timers will fall behind real time,
# if more timers become due than can be activated during a certain time period.
#
# This setting can also be set using the environment variable ZEEBE_BROKER_EXPERIMENTAL_FEATURES_ENABLEYIELDINGDUEDATECHECKER
# enableYieldingDueDateChecker: false
14 changes: 14 additions & 0 deletions dist/src/main/config/broker.yaml.template
Expand Up @@ -651,3 +651,17 @@
# Enables the query api in the broker.
# This setting can also be set using the environmentvariable ZEEBE_BROKER_EXPERIMENTAL_QUERYAPI_ENABLED
# enabled: false

# Allows to configure feature flags. These are used to test new features in dev and int environments prior
# to rolling them out to production
# features:
# Changes the DueDateTimerChecker to give yield to other processing steps in situations where
# it has many (i.e. millions of) timers to process. If set to false (default) the DueDateTimerChecker will activate all
# due timers. In the worst case, this can lead to the node being blocked for indefinite amount of time,
# being subsequently flagged as unhealthy. Currently, there is no known way to recover from this situation
# If set to true, the DueDateTimerChecker will give yield to other processing steps. This avoids the worst case
# described above. However, under consistent high load it may happen that the activated timers will fall behind real time,
# if more timers become due than can be activated during a certain time period.
#
# This setting can also be set using the environment variable ZEEBE_BROKER_EXPERIMENTAL_FEATURES_ENABLEYIELDINGDUEDATECHECKER
# enableYieldingDueDateChecker: false
Expand Up @@ -40,6 +40,7 @@
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.util.FeatureFlags;
import io.camunda.zeebe.util.sched.ActorControl;
import java.util.function.Consumer;

Expand All @@ -51,7 +52,8 @@ public static TypedRecordProcessors createEngineProcessors(
final SubscriptionCommandSender subscriptionCommandSender,
final DeploymentDistributor deploymentDistributor,
final DeploymentResponder deploymentResponder,
final Consumer<String> onJobsAvailableCallback) {
final Consumer<String> onJobsAvailableCallback,
final FeatureFlags featureFlags) {

final var actor = processingContext.getActor();
final MutableZeebeState zeebeState = processingContext.getZeebeState();
Expand All @@ -73,7 +75,8 @@ public static TypedRecordProcessors createEngineProcessors(
ExpressionLanguageFactory.createExpressionLanguage(),
new VariableStateEvaluationContextLookup(variablesState));

final DueDateTimerChecker timerChecker = new DueDateTimerChecker(zeebeState.getTimerState());
final DueDateTimerChecker timerChecker =
new DueDateTimerChecker(zeebeState.getTimerState(), featureFlags);
final CatchEventBehavior catchEventBehavior =
new CatchEventBehavior(
zeebeState,
Expand Down
Expand Up @@ -10,42 +10,30 @@
import io.camunda.zeebe.engine.processing.scheduled.DueDateChecker;
import io.camunda.zeebe.engine.processing.streamprocessor.ReadonlyProcessingContext;
import io.camunda.zeebe.engine.processing.streamprocessor.StreamProcessorLifecycleAware;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.state.immutable.TimerInstanceState;
import io.camunda.zeebe.engine.state.immutable.TimerInstanceState.TimerVisitor;
import io.camunda.zeebe.engine.state.instance.TimerInstance;
import io.camunda.zeebe.protocol.impl.record.value.timer.TimerRecord;
import io.camunda.zeebe.protocol.record.intent.TimerIntent;
import io.camunda.zeebe.util.FeatureFlags;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import java.time.Duration;
import java.util.function.Function;

public class DueDateTimerChecker implements StreamProcessorLifecycleAware {

private static final long TIMER_RESOLUTION = Duration.ofMillis(100).toMillis();
private static final double GIVE_YIELD_FACTOR = 0.5;
private final DueDateChecker dueDateChecker;

private final TimerRecord timerRecord = new TimerRecord();

public DueDateTimerChecker(final TimerInstanceState timerInstanceState) {
public DueDateTimerChecker(
final TimerInstanceState timerInstanceState, final FeatureFlags featureFlags) {
dueDateChecker =
new DueDateChecker(
TIMER_RESOLUTION,
typedCommandWriter ->
timerInstanceState.processTimersWithDueDateBefore(
ActorClock.currentTimeMillis(),
timer -> {
timerRecord.reset();
timerRecord
.setElementInstanceKey(timer.getElementInstanceKey())
.setProcessInstanceKey(timer.getProcessInstanceKey())
.setDueDate(timer.getDueDate())
.setTargetElementId(timer.getHandlerNodeId())
.setRepetitions(timer.getRepetitions())
.setProcessDefinitionKey(timer.getProcessDefinitionKey());

typedCommandWriter.reset();
typedCommandWriter.appendFollowUpCommand(
timer.getKey(), TimerIntent.TRIGGER, timerRecord);

return typedCommandWriter.flush() > 0; // means the write was successful
}));
new TriggerTimersSideEffect(
timerInstanceState, ActorClock.current(), featureFlags.yieldingDueDateChecker()));
}

public void scheduleTimer(final long dueDate) {
Expand Down Expand Up @@ -76,4 +64,90 @@ public void onPaused() {
public void onResumed() {
dueDateChecker.onResumed();
}

protected static final class TriggerTimersSideEffect
implements Function<TypedCommandWriter, Long> {

private final ActorClock actorClock;

private final TimerInstanceState timerInstanceState;
private final boolean yieldControl;

public TriggerTimersSideEffect(
final TimerInstanceState timerInstanceState,
final ActorClock actorClock,
final boolean yieldControl) {
this.timerInstanceState = timerInstanceState;
this.actorClock = actorClock;
this.yieldControl = yieldControl;
}

@Override
public Long apply(final TypedCommandWriter typedCommandWriter) {
final var now = actorClock.getTimeMillis();

final var yieldAfter = now + Math.round(TIMER_RESOLUTION * GIVE_YIELD_FACTOR);

final TimerVisitor timerVisitor;
if (yieldControl) {
timerVisitor =
new YieldingDecorator(
actorClock, yieldAfter, new WriteTriggerTimerCommandVisitor(typedCommandWriter));
} else {
timerVisitor = new WriteTriggerTimerCommandVisitor(typedCommandWriter);
}

return timerInstanceState.processTimersWithDueDateBefore(now, timerVisitor);
}
}

protected static final class WriteTriggerTimerCommandVisitor implements TimerVisitor {

private final TimerRecord timerRecord = new TimerRecord();

private final TypedCommandWriter typedCommandWriter;

public WriteTriggerTimerCommandVisitor(final TypedCommandWriter typedCommandWriter) {
this.typedCommandWriter = typedCommandWriter;
}

@Override
public boolean visit(final TimerInstance timer) {
timerRecord.reset();
timerRecord
.setElementInstanceKey(timer.getElementInstanceKey())
.setProcessInstanceKey(timer.getProcessInstanceKey())
.setDueDate(timer.getDueDate())
.setTargetElementId(timer.getHandlerNodeId())
.setRepetitions(timer.getRepetitions())
.setProcessDefinitionKey(timer.getProcessDefinitionKey());

typedCommandWriter.reset();
typedCommandWriter.appendFollowUpCommand(timer.getKey(), TimerIntent.TRIGGER, timerRecord);

return typedCommandWriter.flush() > 0; // means the write was successful
}
}

protected static final class YieldingDecorator implements TimerVisitor {

private final TimerVisitor delegate;
private final ActorClock actorClock;
private final long giveYieldAfter;

public YieldingDecorator(
final ActorClock actorClock, final long giveYieldAfter, final TimerVisitor delegate) {
this.delegate = delegate;
this.actorClock = actorClock;
this.giveYieldAfter = giveYieldAfter;
}

@Override
public boolean visit(final TimerInstance timer) {
if (actorClock.getTimeMillis() >= giveYieldAfter) {
return false;
}
return delegate.visit(timer);
}
}
}

0 comments on commit 66a9fe8

Please sign in to comment.