Skip to content

Commit

Permalink
merge: #9219
Browse files Browse the repository at this point in the history
9219: Cancel job with incident when canceling the process instance r=korthout a=korthout

## Description

This fixes a bug where canceling a process instance could lead to a job becoming activatable even though the process instance is terminated. See #8588 for more details.

The bug is fixed by making sure the job is immediately canceled (i.e. deleted from the state) when we terminate the related element instance. We achieve this by writing a Job Canceled event when the TERMINATE_ELEMENT command is being processed. 

This PR fixes the bug in a way that can be backported because it doesn't introduce too many changes. Alternatively, the bug could be fixed by tackling the main issue: applying the Incident Resolved event should not affect the state of a job. Instead, the job should be made activatable by writing a separate Job event in the Resolve Incident Processor. That would be a larger change, with more impact on the event appliers than what this PR proposes. Changes to event appliers should generally be avoided.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #8588 



Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and korthout committed Apr 28, 2022
2 parents 2f4d8c8 + 25fae05 commit 053ff2c
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,9 @@ public void jobCanceled(final String type) {
public void jobErrorThrown(final String type) {
jobEvent("error thrown", type);
}

/** Clears the metrics counter. You probably only want to use this during testing. */
static void clear() {
JOB_EVENTS.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.camunda.zeebe.engine.processing.deployment.model.element.JobWorkerProperties;
import io.camunda.zeebe.engine.processing.deployment.model.transformer.ExpressionTransformer;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.Writers;
import io.camunda.zeebe.engine.state.KeyGenerator;
import io.camunda.zeebe.engine.state.immutable.JobState;
Expand All @@ -29,9 +28,11 @@
import io.camunda.zeebe.protocol.impl.record.value.job.JobRecord;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.util.Either;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
Expand All @@ -44,13 +45,14 @@ public final class BpmnJobBehavior {

private static final Logger LOGGER =
LoggerFactory.getLogger(BpmnJobBehavior.class.getPackageName());
private static final Set<State> CANCELABLE_STATES =
EnumSet.of(State.ACTIVATABLE, State.ACTIVATED, State.FAILED, State.ERROR_THROWN);

private final JobRecord jobRecord = new JobRecord().setVariables(DocumentValue.EMPTY_DOCUMENT);
private final HeaderEncoder headerEncoder = new HeaderEncoder();

private final KeyGenerator keyGenerator;
private final StateWriter stateWriter;
private final TypedCommandWriter commandWriter;
private final JobState jobState;
private final ExpressionProcessor expressionBehavior;
private final BpmnStateBehavior stateBehavior;
Expand All @@ -69,7 +71,6 @@ public BpmnJobBehavior(
this.jobState = jobState;
this.expressionBehavior = expressionBehavior;
stateWriter = writers.state();
commandWriter = writers.command();
this.stateBehavior = stateBehavior;
this.incidentBehavior = incidentBehavior;
this.jobMetrics = jobMetrics;
Expand Down Expand Up @@ -171,17 +172,20 @@ public void cancelJob(final BpmnElementContext context) {
final var elementInstance = stateBehavior.getElementInstance(context);
final long jobKey = elementInstance.getJobKey();
if (jobKey > 0) {
writeJobCancelCommand(jobKey);
writeJobCanceled(jobKey);
incidentBehavior.resolveJobIncident(jobKey);
}
}

private void writeJobCancelCommand(final long jobKey) {
private void writeJobCanceled(final long jobKey) {
final State state = jobState.getState(jobKey);

if (state == State.ACTIVATABLE || state == State.ACTIVATED || state == State.FAILED) {
if (CANCELABLE_STATES.contains(state)) {
final JobRecord job = jobState.getJob(jobKey);
commandWriter.appendFollowUpCommand(jobKey, JobIntent.CANCEL, job);
// Note that this logic is duplicated in JobCancelProcessor, if you change this please change
// it there as well.
stateWriter.appendFollowUpEvent(jobKey, JobIntent.CANCELED, job);
jobMetrics.jobCanceled(job.getType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public boolean onCommand(
final long jobKey = command.getKey();
final JobRecord job = jobState.getJob(jobKey);
if (job != null) {
// Note that this logic is duplicated in BpmnJobBehavior, if you change this please change
// it there as well.
commandControl.accept(JobIntent.CANCELED, job);
jobMetrics.jobCanceled(job.getType());
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.metrics;

import static java.util.Map.entry;
import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.engine.util.EngineRule;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.camunda.zeebe.test.util.record.RecordingExporterTestWatcher;
import java.time.Duration;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;

public class JobMetricsTest {

@ClassRule public static final EngineRule ENGINE = EngineRule.singlePartition();

private static final String PROCESS_ID = "process";
private static final String TASK_ID = "task";
private static final String JOB_TYPE = "job";

@Rule public final TestWatcher watcher = new RecordingExporterTestWatcher();

@BeforeClass
public static void deployProcess() {
ENGINE
.deployment()
.withXmlResource(
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent()
.serviceTask(TASK_ID, t -> t.zeebeJobTypeExpression("jobType"))
.endEvent()
.done())
.deploy();
}

@Before
public void resetMetrics() {
JobMetrics.clear();
}

@Test
public void allCountsStartAtNull() {
assertThat(jobMetric("created", JOB_TYPE)).isNull();
assertThat(jobMetric("activated", JOB_TYPE)).isNull();
assertThat(jobMetric("timed out", JOB_TYPE)).isNull();
assertThat(jobMetric("completed", JOB_TYPE)).isNull();
assertThat(jobMetric("failed", JOB_TYPE)).isNull();
assertThat(jobMetric("canceled", JOB_TYPE)).isNull();
assertThat(jobMetric("error thrown", JOB_TYPE)).isNull();
}

@Test
public void shouldCountCreated() {
// when
createProcessInstanceWithJob(JOB_TYPE);

// then
assertThat(jobMetric("created", JOB_TYPE)).isNotNull().isEqualTo(1);
}

@Test
public void shouldCountActivated() {
// given

// the job type must be unique, because other tests may also have created jobs that can be
// activated. We can't depend on the unique process instance when activating a batch of jobs.
final String jobType = JOB_TYPE + "_activated";
createProcessInstanceWithJob(jobType);

// when
ENGINE.jobs().withType(jobType).activate();

// then
assertThat(jobMetric("activated", jobType)).isNotNull().isEqualTo(1);
}

@Test
public void shouldCountTimedOut() {
// given
final long processInstanceKey = createProcessInstanceWithJob(JOB_TYPE);

final var timeout = Duration.ofMinutes(10);
ENGINE.jobs().withType(JOB_TYPE).withTimeout(timeout.toMillis()).activate();

// when
ENGINE.getClock().addTime(timeout);
RecordingExporter.jobRecords(JobIntent.TIMED_OUT)
.withProcessInstanceKey(processInstanceKey)
.await();

// then
assertThat(jobMetric("timed out", JOB_TYPE)).isNotNull().isEqualTo(1);
}

@Test
public void shouldCountCompleted() {
// given
final long processInstanceKey = createProcessInstanceWithJob(JOB_TYPE);

// when
ENGINE.job().ofInstance(processInstanceKey).withType(JOB_TYPE).complete();

// then
assertThat(jobMetric("completed", JOB_TYPE)).isNotNull().isEqualTo(1);
}

@Test
public void shouldCountFailed() {
// given
final long processInstanceKey = createProcessInstanceWithJob(JOB_TYPE);

// when
ENGINE.job().ofInstance(processInstanceKey).withType(JOB_TYPE).fail();

// then
assertThat(jobMetric("failed", JOB_TYPE)).isNotNull().isEqualTo(1);
}

@Test
public void shouldCountCanceled() {
// given
final long processInstanceKey = createProcessInstanceWithJob(JOB_TYPE);

// when
ENGINE.processInstance().withInstanceKey(processInstanceKey).cancel();
RecordingExporter.jobRecords(JobIntent.CANCELED)
.withProcessInstanceKey(processInstanceKey)
.await();

// then
assertThat(jobMetric("canceled", JOB_TYPE)).isNotNull().isEqualTo(1);
}

@Test
public void shouldCountErrorThrown() {
// given
final long processInstanceKey = createProcessInstanceWithJob(JOB_TYPE);

// when
ENGINE.job().ofInstance(processInstanceKey).withType(JOB_TYPE).throwError();

// then
assertThat(jobMetric("error thrown", JOB_TYPE)).isNotNull().isEqualTo(1);
}

/**
* Creates a process instance with a job, and waits until the job is created
*
* @param jobType the job type for the service task
* @return the key of the created process instance
*/
private static long createProcessInstanceWithJob(final String jobType) {
final long processInstanceKey =
ENGINE
.processInstance()
.ofBpmnProcessId(PROCESS_ID)
.withVariable("jobType", jobType)
.create();

RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.await();

return processInstanceKey;
}

private static Double jobMetric(final String action, final String type) {
return MetricsTestHelper.readMetricValue(
"zeebe_job_events_total",
entry("action", action),
entry("partition", "1"),
entry("type", type));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.engine.metrics;

import io.prometheus.client.CollectorRegistry;
import java.util.Arrays;
import java.util.List;
import java.util.Map.Entry;

final class MetricsTestHelper {

/**
* Reads the value of a metric based on its name and labels.
*
* <p>This uses the defaultRegistry which is inefficient, so this should only be used in testing.
*
* @param name the name of the metric
* @param labels names and values for labels. This is useful for filtering the right value within
* the metric.
* @return the given value or null if it doesn't exist
*/
@SafeVarargs
static Double readMetricValue(final String name, final Entry<String, String>... labels) {
final List<String> labelNames = Arrays.stream(labels).map(Entry::getKey).toList();
final List<String> labelValues = Arrays.stream(labels).map(Entry::getValue).toList();
return CollectorRegistry.defaultRegistry.getSampleValue(
name, labelNames.toArray(new String[] {}), labelValues.toArray(new String[] {}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
import io.camunda.zeebe.test.util.record.RecordingExporter;
import io.prometheus.client.CollectorRegistry;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -161,7 +157,7 @@ private Double terminatedProcessInstanceMetric() {
}

private Double executedProcessInstanceMetric(final String action) {
return metricValue(
return MetricsTestHelper.readMetricValue(
"zeebe_executed_instances_total",
entry("organizationId", "null"),
entry("type", "ROOT_PROCESS_INSTANCE"),
Expand All @@ -178,18 +174,10 @@ private Double failedEvaluatedDmnElementsMetric() {
}

private Double evaluatedDmnElementsMetric(final String action) {
return metricValue(
return MetricsTestHelper.readMetricValue(
"zeebe_evaluated_dmn_elements_total",
entry("organizationId", "null"),
entry("action", action),
entry("partition", "1"));
}

@SafeVarargs
private Double metricValue(final String name, final Entry<String, String>... labels) {
final List<String> labelNames = Arrays.stream(labels).map(Entry::getKey).toList();
final List<String> labelValues = Arrays.stream(labels).map(Entry::getValue).toList();
return CollectorRegistry.defaultRegistry.getSampleValue(
name, labelNames.toArray(new String[] {}), labelValues.toArray(new String[] {}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void shouldActivateBoundaryEventWhenEventTriggered() {
.containsSubsequence(
tuple(ValueType.TIMER, TimerIntent.TRIGGERED),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATING),
tuple(ValueType.JOB, JobIntent.CANCEL),
tuple(ValueType.JOB, JobIntent.CANCELED),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATING));
}
Expand Down Expand Up @@ -258,14 +258,13 @@ public void shouldTerminateSubProcessBeforeTriggeringBoundaryEvent() {
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATING),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.TERMINATE_ELEMENT),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATING),
tuple(ValueType.JOB, JobIntent.CANCEL),
tuple(ValueType.JOB, JobIntent.CANCELED),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_TERMINATED),
tuple(ValueType.PROCESS_EVENT, ProcessEventIntent.TRIGGERED),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATING),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_ACTIVATED),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.COMPLETE_ELEMENT),
tuple(ValueType.JOB, JobIntent.CANCELED),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETING),
tuple(ValueType.PROCESS_INSTANCE, ProcessInstanceIntent.ELEMENT_COMPLETED));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,10 @@ public void shouldDeleteIncidentIfJobIsCanceled() {
.withIntent(ProcessInstanceIntent.TERMINATE_ELEMENT)
.getFirst();

final Record<JobRecordValue> jobCancelCommand =
final Record<JobRecordValue> jobCancelled =
RecordingExporter.jobRecords()
.withProcessInstanceKey(processInstanceKey)
.withIntent(JobIntent.CANCEL)
.withIntent(JobIntent.CANCELED)
.getFirst();

final Record<IncidentRecordValue> resolvedIncidentEvent =
Expand All @@ -357,7 +357,7 @@ public void shouldDeleteIncidentIfJobIsCanceled() {
assertThat(resolvedIncidentEvent.getKey()).isEqualTo(incidentCreatedEvent.getKey());
assertThat(resolvedIncidentEvent.getSourceRecordPosition())
.isEqualTo(terminateTaskCommand.getPosition());
assertThat(jobCancelCommand.getSourceRecordPosition())
assertThat(jobCancelled.getSourceRecordPosition())
.isEqualTo(terminateTaskCommand.getPosition());

assertThat(resolvedIncidentEvent.getValue())
Expand Down

0 comments on commit 053ff2c

Please sign in to comment.