Skip to content

Commit

Permalink
refactor: address and resolve TODOs (#18425)
Browse files Browse the repository at this point in the history
## Description

- made job maxRetries configurable via @Jobworker annotation
- moved metrics to command callback
- changed the way how the SDK handle error code, throwing an error every
also when an endpoint is not found
- removed old todos

## Related issues

closes #17342
  • Loading branch information
nicpuppa committed May 14, 2024
2 parents caa615d + 73ef4e8 commit cf2186e
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 54 deletions.
1 change: 0 additions & 1 deletion spring-boot-starter-camunda-sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
<properties>
<version.java>17</version.java>
<commons-beanutils.version>1.9.4</commons-beanutils.version>
<zeebe-process-test.version>8.5.0</zeebe-process-test.version>
<license.header>com/mycila/maven/plugin/license/templates/APACHE-2.txt</license.header>
<identity.version>8.5.1</identity.version>
<java-jwt.version>4.4.0</java-jwt.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,7 @@

/** Stream timeout in ms */
long streamTimeout() default -1L;

/** Set the max number of retries for a job */
int maxRetries() default -1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public Optional<ZeebeWorkerValue> readJobWorkerAnnotationForMethod(final MethodI
Arrays.asList(annotation.tenantIds()),
annotation.fetchAllVariables(),
annotation.streamEnabled(),
Duration.of(annotation.streamTimeout(), ChronoUnit.MILLIS)));
Duration.of(annotation.streamTimeout(), ChronoUnit.MILLIS),
annotation.maxRetries()));
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class ZeebeWorkerValue implements ZeebeAnnotationValue<MethodInfo> {
private Boolean forceFetchAllVariables;
private Boolean streamEnabled;
private Duration streamTimeout;
private int maxRetries;

public ZeebeWorkerValue() {}

Expand All @@ -62,7 +63,8 @@ public ZeebeWorkerValue(
final List<String> tenantIds,
final Boolean forceFetchAllVariables,
final Boolean streamEnabled,
final Duration streamTimeout) {
final Duration streamTimeout,
final int maxRetries) {
this.type = type;
this.name = name;
this.timeout = timeout;
Expand All @@ -77,6 +79,7 @@ public ZeebeWorkerValue(
this.forceFetchAllVariables = forceFetchAllVariables;
this.streamEnabled = streamEnabled;
this.streamTimeout = streamTimeout;
this.maxRetries = maxRetries;
}

public String getType() {
Expand Down Expand Up @@ -191,6 +194,14 @@ public void setStreamTimeout(final Duration streamTimeout) {
this.streamTimeout = streamTimeout;
}

public int getMaxRetries() {
return maxRetries;
}

public void setMaxRetries(final int maxRetries) {
this.maxRetries = maxRetries;
}

@Override
public MethodInfo getBeanInfo() {
return methodInfo;
Expand All @@ -212,7 +223,8 @@ public int hashCode() {
tenantIds,
forceFetchAllVariables,
streamEnabled,
streamTimeout);
streamTimeout,
maxRetries);
}

@Override
Expand All @@ -237,7 +249,8 @@ public boolean equals(final Object o) {
&& Objects.equals(tenantIds, that.tenantIds)
&& Objects.equals(forceFetchAllVariables, that.forceFetchAllVariables)
&& Objects.equals(streamEnabled, that.streamEnabled)
&& Objects.equals(streamTimeout, that.streamTimeout);
&& Objects.equals(streamTimeout, that.streamTimeout)
&& Objects.equals(maxRetries, that.maxRetries);
}

@Override
Expand Down Expand Up @@ -273,6 +286,8 @@ public String toString() {
+ streamEnabled
+ ", streamTimeout="
+ streamTimeout
+ ", maxRetries="
+ maxRetries
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,33 @@
import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.worker.BackoffSupplier;
import io.camunda.zeebe.spring.client.metrics.MetricsRecorder;
import java.time.Instant;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class CommandWrapper {

private final FinalCommandStep<Void> command;

private final ActivatedJob job;
private final CommandExceptionHandlingStrategy commandExceptionHandlingStrategy;
private final MetricsRecorder metricsRecorder;

private long currentRetryDelay = 50L;
private int invocationCounter = 0;
private final int maxRetries = 20; // TODO: Make configurable
private final int maxRetries;

public CommandWrapper(
final FinalCommandStep<Void> command,
final ActivatedJob job,
final CommandExceptionHandlingStrategy commandExceptionHandlingStrategy) {
final CommandExceptionHandlingStrategy commandExceptionHandlingStrategy,
final MetricsRecorder metricsRecorder,
final int maxRetries) {
this.command = command;
this.job = job;
this.commandExceptionHandlingStrategy = commandExceptionHandlingStrategy;
this.metricsRecorder = metricsRecorder;
this.maxRetries = maxRetries;
}

public void executeAsync() {
Expand All @@ -53,6 +58,23 @@ public void executeAsync() {
});
}

public void executeAsyncWithMetrics(
final String metricName, final String action, final String type) {
invocationCounter++;
command
.send()
.thenApply(
result -> {
metricsRecorder.increase(metricName, action, type);
return result;
})
.exceptionally(
t -> {
commandExceptionHandlingStrategy.handleCommandError(this, t);
return null;
});
}

public void increaseBackoffUsing(final BackoffSupplier backoffSupplier) {
currentRetryDelay = backoffSupplier.supplyRetryDelay(currentRetryDelay);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,16 @@ public void handleCommandError(final CommandWrapper command, final Throwable thr
final StatusRuntimeException exception = (StatusRuntimeException) throwable;
final Status.Code code = exception.getStatus().getCode();

// Success codes should not lead to an exception!
if (IGNORABLE_FAILURE_CODES.contains(code)) {
LOG.warn(
"Ignoring the error of type '"
+ code
+ "' during "
+ command
+ ". Job might have been canceled or already completed.");
// TODO: Is Ignorance really a good idea? Think of some local transaction that might need to
// TODO: be marked for rollback! But for sure, retry does not help at all
return;
} else if (RETRIABLE_CODES.contains(code)) {
if (command.hasMoreRetries()) {
command.increaseBackoffUsing(backoffSupplier);
LOG.warn("Retrying " + command + " after error of type '" + code + "' with backoff");
command.scheduleExecutionUsing(scheduledExecutorService);
return;
} else {
if (!RETRIABLE_CODES.contains(code)
|| !IGNORABLE_FAILURE_CODES.contains(code)
|| FAILURE_CODES.contains(code)) {
throw new RuntimeException(
"Could not execute " + command + " due to exception: " + throwable.getMessage(),
throwable);
}

if (RETRIABLE_CODES.contains(code)) {
if (!command.hasMoreRetries()) {
throw new RuntimeException(
"Could not execute "
+ command
Expand All @@ -88,14 +80,14 @@ public void handleCommandError(final CommandWrapper command, final Throwable thr
+ "' and no retries are left",
throwable);
}
} else if (FAILURE_CODES.contains(code)) {
throw new RuntimeException(
"Could not execute " + command + " due to error of type '" + code + "'", throwable);
command.increaseBackoffUsing(backoffSupplier);
LOG.warn("Retrying {} after error of type '{}' with backoff", command, code);
command.scheduleExecutionUsing(scheduledExecutorService);
return;
}
}

// if it wasn't handled yet, throw an exception
throw new RuntimeException(
"Could not execute " + command + " due to exception: " + throwable.getMessage(), throwable);
throw new RuntimeException(
"Could not execute " + command + " due to error of type '" + code + "'", throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,50 +61,43 @@ private List<ParameterResolver> createParameterResolvers(

@Override
public void handle(final JobClient jobClient, final ActivatedJob job) throws Exception {
// TODO: Figuring out parameters and assignments could probably also done only once in the
// beginning to save some computing time on each invocation
final List<Object> args = createParameters(jobClient, job);
LOG.trace("Handle {} and invoke worker {}", job, workerValue);
try {
metricsRecorder.increase(
MetricsRecorder.METRIC_NAME_JOB, MetricsRecorder.ACTION_ACTIVATED, job.getType());
Object result = null;
final Object result;
try {
result = workerValue.getMethodInfo().invoke(args.toArray());
} catch (final Throwable t) {
metricsRecorder.increase(
MetricsRecorder.METRIC_NAME_JOB, MetricsRecorder.ACTION_FAILED, job.getType());
// normal exceptions are handled by JobRunnableFactory
// (https://github.com/camunda-cloud/zeebe/blob/develop/clients/java/src/main/java/io/camunda/zeebe/client/impl/worker/JobRunnableFactory.java#L45)
// which leads to retrying
throw t;
}

if (workerValue.getAutoComplete()) {
LOG.trace("Auto completing {}", job);
// TODO: We should probably move the metrics recording to the callback of a successful
// command execution to avoid wrong counts
metricsRecorder.increase(
MetricsRecorder.METRIC_NAME_JOB, MetricsRecorder.ACTION_COMPLETED, job.getType());
final CommandWrapper command =
new CommandWrapper(
createCompleteCommand(jobClient, job, result),
job,
commandExceptionHandlingStrategy);
command.executeAsync();
commandExceptionHandlingStrategy,
metricsRecorder,
workerValue.getMaxRetries());
command.executeAsyncWithMetrics(
MetricsRecorder.METRIC_NAME_JOB, MetricsRecorder.ACTION_COMPLETED, job.getType());
}
} catch (final ZeebeBpmnError bpmnError) {
LOG.trace("Catched BPMN error on {}", job);
// TODO: We should probably move the metrics recording to the callback of a successful command
// execution to avoid wrong counts
metricsRecorder.increase(
MetricsRecorder.METRIC_NAME_JOB, MetricsRecorder.ACTION_BPMN_ERROR, job.getType());
final CommandWrapper command =
new CommandWrapper(
createThrowErrorCommand(jobClient, job, bpmnError),
job,
commandExceptionHandlingStrategy);
command.executeAsync();
commandExceptionHandlingStrategy,
metricsRecorder,
workerValue.getMaxRetries());
command.executeAsyncWithMetrics(
MetricsRecorder.METRIC_NAME_JOB, MetricsRecorder.ACTION_BPMN_ERROR, job.getType());
}
}

Expand Down Expand Up @@ -133,7 +126,7 @@ private FinalCommandStep<Void> createThrowErrorCommand(
final JobClient jobClient, final ActivatedJob job, final ZeebeBpmnError bpmnError) {
final ThrowErrorCommandStep2 command =
jobClient
.newThrowErrorCommand(job.getKey()) // TODO: PR for taking a job only in command chain
.newThrowErrorCommand(job.getKey())
.errorCode(bpmnError.getErrorCode())
.errorMessage(bpmnError.getErrorMessage());
if (bpmnError.getVariables() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public JobWorker openWorker(final ZeebeClient client, final ZeebeWorkerValue zee
public JobWorker openWorker(
final ZeebeClient client, final ZeebeWorkerValue zeebeWorkerValue, final JobHandler handler) {

// TODO: Trigger initialization of worker values and defaults here

final JobWorkerBuilderStep1.JobWorkerBuilderStep3 builder =
client
.newWorker()
Expand Down

0 comments on commit cf2186e

Please sign in to comment.