Skip to content

Commit

Permalink
merge: #9389
Browse files Browse the repository at this point in the history
9389: Implement a job backoff timeout on Zeebe Java client side r=saig0 a=aivinog1

## Description

<!-- Please explain the changes you made here. -->
I implemented the usage of the backoff parameter in the Zeebe Java Client and `@saig0` fixed the problem with flushing responses.

## Related issues

<!-- Which issues are closed by this PR or are related -->

#5629 



Co-authored-by: Alexey Vinogradov <vinogradov.a.i.93@gmail.com>
Co-authored-by: Philipp Ossler <philipp.ossler@gmail.com>
  • Loading branch information
3 people committed May 20, 2022
2 parents 751140f + 5770017 commit c04b75a
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 1 deletion.
Expand Up @@ -16,6 +16,7 @@
package io.camunda.zeebe.client.api.command;

import io.camunda.zeebe.client.api.response.FailJobResponse;
import java.time.Duration;

public interface FailJobCommandStep1 {

Expand All @@ -34,6 +35,18 @@ public interface FailJobCommandStep1 {
interface FailJobCommandStep2 extends FinalCommandStep<FailJobResponse> {
// the place for new optional parameters

/**
* Set the backoff timeout for failing this job.
*
* <p>If the backoff timeout is greater than zero and retries are greater than zero then this
* job will be picked up again after the given backoff timeout is expired.
*
* @param backoffTimeout the backoff timeout of this job
* @return the builder for this command. Call {@link #send()} to complete the command and send *
* it to the broker.
*/
FailJobCommandStep2 retryBackoff(final Duration backoffTimeout);

/**
* Provide an error message describing the reason for the job failure. If failing the job
* creates an incident, this error message will be used as incident message.
Expand Down
Expand Up @@ -56,6 +56,12 @@ public FailJobCommandStep2 retries(final int retries) {
return this;
}

@Override
public FailJobCommandStep2 retryBackoff(final Duration backoffTimeout) {
builder.setRetryBackOff(backoffTimeout.toMillis());
return this;
}

@Override
public FailJobCommandStep2 errorMessage(final String errorMsg) {
builder.setErrorMessage(errorMsg);
Expand Down
Expand Up @@ -77,6 +77,52 @@ public void shouldFailJobWithMessage() {
rule.verifyDefaultRequestTimeout();
}

@Test
public void shouldFailJobWithBackoff() {
// given
final long jobKey = 12;
final int newRetries = 23;

// when
final Duration backoffTimeout = Duration.ofSeconds(1);
client.newFailCommand(jobKey).retries(newRetries).retryBackoff(backoffTimeout).send().join();

// then
final FailJobRequest request = gatewayService.getLastRequest();
assertThat(request.getJobKey()).isEqualTo(jobKey);
assertThat(request.getRetries()).isEqualTo(newRetries);
assertThat(request.getRetryBackOff()).isEqualTo(backoffTimeout.toMillis());

rule.verifyDefaultRequestTimeout();
}

@Test
public void shouldFailJobWithBackoffAndMessage() {
// given
final long jobKey = 12;
final int newRetries = 23;
final String message = "failed message";

// when
final Duration backoffTimeout = Duration.ofSeconds(1);
client
.newFailCommand(jobKey)
.retries(newRetries)
.retryBackoff(backoffTimeout)
.errorMessage(message)
.send()
.join();

// then
final FailJobRequest request = gatewayService.getLastRequest();
assertThat(request.getJobKey()).isEqualTo(jobKey);
assertThat(request.getRetries()).isEqualTo(newRetries);
assertThat(request.getRetryBackOff()).isEqualTo(backoffTimeout.toMillis());
assertThat(request.getErrorMessage()).isEqualTo(message);

rule.verifyDefaultRequestTimeout();
}

@Test
public void shouldSetRequestTimeout() {
// given
Expand Down
Expand Up @@ -9,6 +9,7 @@

import io.camunda.zeebe.engine.processing.streamprocessor.CommandProcessor.CommandControl;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectProducer;
import io.camunda.zeebe.engine.processing.streamprocessor.sideeffect.SideEffectQueue;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.StateWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedCommandWriter;
import io.camunda.zeebe.engine.processing.streamprocessor.writers.TypedRejectionWriter;
Expand All @@ -35,6 +36,8 @@
public final class CommandProcessorImpl<T extends UnifiedRecordValue>
implements TypedRecordProcessor<T>, CommandControl<T> {

private final SideEffectQueue sideEffectQueue = new SideEffectQueue();

private final CommandProcessor<T> wrappedProcessor;

private final KeyGenerator keyGenerator;
Expand Down Expand Up @@ -70,7 +73,12 @@ public void processRecord(
final Consumer<SideEffectProducer> sideEffect) {

entityKey = command.getKey();
final boolean shouldRespond = wrappedProcessor.onCommand(command, this, sideEffect);

sideEffect.accept(sideEffectQueue);
sideEffectQueue.clear();
sideEffectQueue.add(responseWriter::flush);

final boolean shouldRespond = wrappedProcessor.onCommand(command, this, sideEffectQueue::add);

final boolean respond = shouldRespond && command.hasRequestMetadata();

Expand Down
Expand Up @@ -20,6 +20,7 @@
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
import io.camunda.zeebe.test.util.BrokerClassRuleHelper;
import java.time.Duration;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
Expand Down Expand Up @@ -73,6 +74,26 @@ public void shouldFailJobWithErrorMessage() {
Assertions.assertThat(record.getValue()).hasRetries(0).hasErrorMessage("test");
}

@Test
public void shouldFailJobWithRetryBackOff() {
// when
final Duration backoffTimeout = Duration.ofSeconds(30);
CLIENT_RULE
.getClient()
.newFailCommand(jobKey)
.retries(1)
.retryBackoff(backoffTimeout)
.send()
.join();

// then
final Record<JobRecordValue> beforeRecurRecord =
jobRecords(JobIntent.FAILED).withRecordKey(jobKey).getFirst();
Assertions.assertThat(beforeRecurRecord.getValue())
.hasRetries(1)
.hasRetryBackoff(backoffTimeout.toMillis());
}

@Test
public void shouldRejectIfJobIsAlreadyCompleted() {
// given
Expand Down

0 comments on commit c04b75a

Please sign in to comment.