Skip to content

Commit

Permalink
merge: #9756
Browse files Browse the repository at this point in the history
9756: Hide scheduling behind interface r=pihme a=pihme

## Description

In terms of the scope defined in #9730 this implements the following:

- [x] Create a new interface for the ProcessingScheduleService (with narrowed scope)
  - [x] Possibily only two methods, runDelayed and runComplete take a look at the POC #9602 
- [x] Implement the interface
- [x] Before migrating to the new abstraction, migrate the ActorCOntrol#runAtFixedRate consumers to the #runDelayed usage, this means after each run the job needs to be scheduled again
- [x] Migrate step by step the actorControl usage
- [x] Remove the actor control from the ProcessingContext

## Related issues

related to #9730

## Review Hints
* This is not the final form of the scheduling interface, instead the focus of this PR is to hide all the dependencies behind an interface first
* The change is not a pure refactoring. So there is a residual risk that the behavior is different in subtle ways (which is why I would like to have a review by both of you)
* The new code sometimes (indirectly) calls different methods on the `ActorControl`. Therefore there might be differences in the way tasks are scheduled (top of queue/bottom of queue; fast lane or not). The intention of the change was to simplify the interface that is available to the engine. In this regard some subtle changes are unavoidable
* Part of the simplification is also that the engine does not have access to something like a `ScheduledTimer`. Therefore, the engine is unable to cancel tasks which have been scheduled
* `RunAtFixedRate` has been replaced by tasks that reschedule themselves after they are called
* There is a difference in the way exceptions are propagated. See commit message a406d3f for one such example
* Other than that, the tests pass and I just started a QA run, so let's see 🤞 



Co-authored-by: pihme <pihme@users.noreply.github.com>
  • Loading branch information
zeebe-bors-camunda[bot] and pihme committed Jul 13, 2022
2 parents 4125681 + 6872540 commit 8b5c14a
Show file tree
Hide file tree
Showing 20 changed files with 289 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
import io.camunda.zeebe.broker.partitioning.topology.TopologyPartitionListenerImpl;
import io.camunda.zeebe.broker.system.management.deployment.PushDeploymentRequest;
import io.camunda.zeebe.broker.system.management.deployment.PushDeploymentResponse;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.processing.deployment.distribute.DeploymentDistributor;
import io.camunda.zeebe.protocol.impl.encoding.ErrorResponse;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.util.buffer.BufferUtil;
Expand All @@ -40,7 +40,7 @@ public final class DeploymentDistributorImpl implements DeploymentDistributor {
private final ErrorResponse errorResponse = new ErrorResponse();

private final TopologyPartitionListenerImpl partitionListener;
private final ActorControl actor;
private final ProcessingScheduleService scheduleService;

private final ClusterCommunicationService communicationService;
private final ClusterEventService eventService;
Expand All @@ -49,13 +49,13 @@ public DeploymentDistributorImpl(
final ClusterCommunicationService communicationService,
final ClusterEventService eventService,
final TopologyPartitionListenerImpl partitionListener,
final ActorControl actor) {
final ProcessingScheduleService scheduleService) {

this.communicationService = communicationService;
this.eventService = eventService;

this.partitionListener = partitionListener;
this.actor = actor;
this.scheduleService = scheduleService;
}

@Override
Expand All @@ -77,7 +77,7 @@ private void scheduleRetryPushDeploymentAfterADelay(
final int partitionId,
final CompletableActorFuture<Void> pushedFuture,
final PushDeploymentRequest pushRequest) {
actor.runDelayed(
scheduleService.runDelayed(
PUSH_REQUEST_TIMEOUT,
() -> {
final String topic = getDeploymentResponseTopic(pushRequest.deploymentKey(), partitionId);
Expand Down Expand Up @@ -201,7 +201,7 @@ private void handleRetry(
final int partitionLeaderId, final int partition, final PushDeploymentRequest pushRequest) {
LOG.trace("Retry deployment push to partition {} after {}", partition, RETRY_DELAY);

actor.runDelayed(
scheduleService.runDelayed(
RETRY_DELAY,
() -> {
final Int2IntHashMap partitionLeaders = partitionListener.getPartitionLeaders();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,15 @@ private TypedRecordProcessorsFactory createFactory(
final PushDeploymentRequestHandler deploymentRequestHandler,
final FeatureFlags featureFlags) {
return (recordProcessorContext) -> {
final var actor = recordProcessorContext.getActor();
final var scheduleService = recordProcessorContext.getScheduleService();

final TopologyPartitionListenerImpl partitionListener =
new TopologyPartitionListenerImpl(actor);
new TopologyPartitionListenerImpl(scheduleService);
topologyManager.addTopologyPartitionListener(partitionListener);

final DeploymentDistributorImpl deploymentDistributor =
new DeploymentDistributorImpl(
communicationService, eventService, partitionListener, actor);
communicationService, eventService, partitionListener, scheduleService);

final PartitionCommandSenderImpl partitionCommandSender =
new PartitionCommandSenderImpl(communicationService, partitionListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,25 @@
*/
package io.camunda.zeebe.broker.partitioning.topology;

import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.protocol.record.PartitionRole;
import io.camunda.zeebe.scheduler.ActorControl;
import java.time.Duration;
import org.agrona.collections.Int2IntHashMap;

public final class TopologyPartitionListenerImpl implements TopologyPartitionListener {

private final Int2IntHashMap partitionLeaders = new Int2IntHashMap(-1);
private final ActorControl actor;
private final ProcessingScheduleService scheduleService;

public TopologyPartitionListenerImpl(final ActorControl actor) {
this.actor = actor;
public TopologyPartitionListenerImpl(final ProcessingScheduleService scheduleService) {
this.scheduleService = scheduleService;
}

@Override
public void onPartitionLeaderUpdated(final int partitionId, final BrokerInfo member) {
if (member.getPartitionRoles().get(partitionId) == PartitionRole.LEADER) {
actor.submit(() -> updatePartitionLeader(partitionId, member));
scheduleService.runDelayed(Duration.ZERO, () -> updatePartitionLeader(partitionId, member));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.api;

import io.camunda.zeebe.scheduler.future.ActorFuture;
import java.time.Duration;
import java.util.function.BiConsumer;

public interface ProcessingScheduleService {

void runDelayed(Duration delay, Runnable followUpTask);

<T> void runOnCompletion(ActorFuture<T> precedingTask, BiConsumer<T, Throwable> followUpTask);

default void runAtFixedRate(final Duration delay, final Runnable task) {
runDelayed(
delay,
() -> {
try {
task.run();
} finally {
runAtFixedRate(delay, task);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,10 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.scheduler.ActorControl;

public interface ReadonlyStreamProcessorContext {

/**
* @return the actor on which the processing runs
*/
ActorControl getActor();
ProcessingScheduleService getScheduleService();

/**
* @return the logstream, on which the processor runs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.immutable.LastProcessedPositionState;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.scheduler.ActorControl;

public interface RecordProcessorContext {

int getPartitionId();

ActorControl getActor();
ProcessingScheduleService getScheduleService();

MutableZeebeState getZeebeState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.camunda.zeebe.protocol.record.intent.DeploymentIntent.CREATE;

import io.camunda.zeebe.el.ExpressionLanguageFactory;
import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.RecordProcessorContext;
import io.camunda.zeebe.engine.metrics.JobMetrics;
import io.camunda.zeebe.engine.processing.bpmn.behavior.BpmnEventPublicationBehavior;
Expand Down Expand Up @@ -39,7 +40,6 @@
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.util.FeatureFlags;
import java.util.function.Consumer;

Expand All @@ -54,7 +54,7 @@ public static TypedRecordProcessors createEngineProcessors(
final Consumer<String> onJobsAvailableCallback,
final FeatureFlags featureFlags) {

final var actor = recordProcessorContext.getActor();
final var scheduleService = recordProcessorContext.getScheduleService();
final MutableZeebeState zeebeState = recordProcessorContext.getZeebeState();
final var writers = recordProcessorContext.getWriters();
final TypedRecordProcessors typedRecordProcessors =
Expand Down Expand Up @@ -102,7 +102,7 @@ public static TypedRecordProcessors createEngineProcessors(
expressionProcessor,
writers,
partitionsCount,
actor,
scheduleService,
deploymentDistributor,
zeebeState.getKeyGenerator());
addMessageProcessors(
Expand Down Expand Up @@ -176,7 +176,7 @@ private static void addDeploymentRelatedProcessorAndServices(
final ExpressionProcessor expressionProcessor,
final Writers writers,
final int partitionsCount,
final ActorControl actor,
final ProcessingScheduleService scheduleService,
final DeploymentDistributor deploymentDistributor,
final KeyGenerator keyGenerator) {

Expand All @@ -189,7 +189,7 @@ private static void addDeploymentRelatedProcessorAndServices(
expressionProcessor,
partitionsCount,
writers,
actor,
scheduleService,
deploymentDistributor,
keyGenerator);
typedRecordProcessors.onCommand(ValueType.DEPLOYMENT, CREATE, processor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import static io.camunda.zeebe.engine.state.instance.TimerInstance.NO_ELEMENT_INSTANCE;

import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.api.TypedRecord;
import io.camunda.zeebe.engine.processing.common.CatchEventBehavior;
import io.camunda.zeebe.engine.processing.common.ExpressionProcessor;
Expand Down Expand Up @@ -37,7 +38,6 @@
import io.camunda.zeebe.protocol.impl.record.value.deployment.ProcessMetadata;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.util.Either;
import java.util.List;
import java.util.function.Consumer;
Expand Down Expand Up @@ -66,7 +66,7 @@ public DeploymentCreateProcessor(
final ExpressionProcessor expressionProcessor,
final int partitionsCount,
final Writers writers,
final ActorControl actor,
final ProcessingScheduleService scheduleService,
final DeploymentDistributor deploymentDistributor,
final KeyGenerator keyGenerator) {
processState = zeebeState.getProcessState();
Expand All @@ -81,7 +81,8 @@ public DeploymentCreateProcessor(
new MessageStartEventSubscriptionManager(
processState, zeebeState.getMessageStartEventSubscriptionState(), keyGenerator);
deploymentDistributionBehavior =
new DeploymentDistributionBehavior(writers, partitionsCount, deploymentDistributor, actor);
new DeploymentDistributionBehavior(
writers, partitionsCount, deploymentDistributor, scheduleService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package io.camunda.zeebe.engine.processing.deployment.distribute;

import io.camunda.zeebe.engine.api.ProcessingScheduleService;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
Expand All @@ -15,9 +16,10 @@
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.scheduler.ActorControl;
import io.camunda.zeebe.util.buffer.BufferUtil;
import java.time.Duration;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.agrona.DirectBuffer;
Expand All @@ -30,22 +32,22 @@ public final class DeploymentDistributionBehavior {

private final List<Integer> otherPartitions;
private final DeploymentDistributor deploymentDistributor;
private final ActorControl processingActor;
private final ProcessingScheduleService scheduleService;
private final StateWriter stateWriter;
private final TypedCommandWriter commandWriter;

public DeploymentDistributionBehavior(
final Writers writers,
final int partitionsCount,
final DeploymentDistributor deploymentDistributor,
final ActorControl processingActor) {
final ProcessingScheduleService scheduleService) {
otherPartitions =
IntStream.range(Protocol.START_PARTITION_ID, Protocol.START_PARTITION_ID + partitionsCount)
.filter(partition -> partition != Protocol.DEPLOYMENT_PARTITION)
.boxed()
.collect(Collectors.toList());
this.deploymentDistributor = deploymentDistributor;
this.processingActor = processingActor;
this.scheduleService = scheduleService;

stateWriter = writers.state();
commandWriter = writers.command();
Expand Down Expand Up @@ -78,21 +80,37 @@ public void distributeDeploymentToPartition(
final var deploymentPushedFuture =
deploymentDistributor.pushDeploymentToPartition(key, partitionId, copiedDeploymentBuffer);

deploymentPushedFuture.onComplete(
(v, t) ->
processingActor.runUntilDone(
() -> {
deploymentDistributionRecord.setPartition(partitionId);
commandWriter.reset();
commandWriter.appendFollowUpCommand(
key, DeploymentDistributionIntent.COMPLETE, deploymentDistributionRecord);

final long pos = commandWriter.flush();
if (pos < 0) {
processingActor.yieldThread();
} else {
processingActor.done();
}
}));
scheduleService.runOnCompletion(
deploymentPushedFuture, new WriteDeploymentDistributionCompleteTask(partitionId, key));
}

private final class WriteDeploymentDistributionCompleteTask
implements Runnable, BiConsumer<Void, Throwable> {

private final int partitionId;
private final long key;

private WriteDeploymentDistributionCompleteTask(final int partitionId, final long key) {
this.partitionId = partitionId;
this.key = key;
}

@Override
public void run() {
deploymentDistributionRecord.setPartition(partitionId);
commandWriter.reset();
commandWriter.appendFollowUpCommand(
key, DeploymentDistributionIntent.COMPLETE, deploymentDistributionRecord);

final long pos = commandWriter.flush();
if (pos < 0) {
scheduleService.runDelayed(Duration.ofMillis(100), this);
}
}

@Override
public void accept(final Void unused, final Throwable throwable) {
run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ public DeploymentRedistributor(

@Override
public void onRecovered(final ReadonlyStreamProcessorContext context) {
final var actor = context.getActor();
final var writers = context.getWriters();

final var deploymentDistributionBehavior =
new DeploymentDistributionBehavior(writers, partitionsCount, deploymentDistributor, actor);
new DeploymentDistributionBehavior(
writers, partitionsCount, deploymentDistributor, context.getScheduleService());

deploymentState.foreachPendingDeploymentDistribution(
(key, partitionId, deployment) ->
Expand Down

0 comments on commit 8b5c14a

Please sign in to comment.