Skip to content

Commit

Permalink
merge: #10388
Browse files Browse the repository at this point in the history
10388: Accept Process Instance cancelling and new deployment when backpressure is high r=saig0 a=remcowesterhoud

## Description

<!-- Please explain the changes you made here. -->
Allow cancelling of process instances and deployments of new processes when the back pressure is high.

It could occur that a cluster has a high back pressure because of a faulty process. This could be resolved by cancelling the instance, or by creating a new deployment. When these commands get rejected because of this high backpressure it becomes unrecoverable.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #9283



Co-authored-by: Remco Westerhoud <remco@westerhoud.nl>
  • Loading branch information
zeebe-bors-camunda[bot] and remcowesterhoud committed Sep 19, 2022
2 parents 8cdb959 + 5b68e05 commit 1ead10a
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

import com.netflix.concurrency.limits.limiter.AbstractLimiter;
import io.camunda.zeebe.broker.Loggers;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import java.util.EnumSet;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand All @@ -26,7 +28,13 @@ public final class CommandRateLimiter extends AbstractLimiter<Intent>
private static final Logger LOG =
LoggerFactory.getLogger("io.camunda.zeebe.broker.transport.backpressure");
private static final Set<? extends Intent> WHITE_LISTED_COMMANDS =
EnumSet.of(JobIntent.COMPLETE, JobIntent.FAIL);
Set.of(
JobIntent.COMPLETE,
JobIntent.FAIL,
ProcessInstanceIntent.CANCEL,
DeploymentIntent.CREATE,
DeploymentIntent.DISTRIBUTE,
DeploymentDistributionIntent.COMPLETE);
private final Map<ListenerId, Listener> responseListeners = new ConcurrentHashMap<>();
private final int partitionId;
private final BackpressureMetrics metrics = new BackpressureMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,20 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.netflix.concurrency.limits.limit.SettableLimit;
import io.camunda.zeebe.protocol.record.intent.DeploymentDistributionIntent;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceCreationIntent;
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
import java.util.stream.IntStream;
import org.junit.Test;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

public final class CommandRateLimiterTest {
class CommandRateLimiterTest {

private static final int INITIAL_LIMIT = 5;
private final SettableLimit limit = new SettableLimit(INITIAL_LIMIT);
Expand All @@ -25,12 +32,12 @@ public final class CommandRateLimiterTest {
private final Intent context = ProcessInstanceCreationIntent.CREATE;

@Test
public void shouldAcquire() {
void shouldAcquire() {
assertThat(rateLimiter.tryAcquire(0, 1, context)).isTrue();
}

@Test
public void shouldNotAcquireAfterLimit() {
void shouldNotAcquireAfterLimit() {
// given
IntStream.range(0, limit.getLimit())
.forEach(i -> assertThat(rateLimiter.tryAcquire(0, 1, context)).isTrue());
Expand All @@ -39,10 +46,9 @@ public void shouldNotAcquireAfterLimit() {
}

@Test
public void shouldCompleteRequestOnResponse() {
void shouldCompleteRequestOnResponse() {
// given
IntStream.range(0, limit.getLimit())
.forEach(i -> assertThat(rateLimiter.tryAcquire(0, i, context)));
IntStream.range(0, limit.getLimit()).forEach(i -> rateLimiter.tryAcquire(0, i, context));
assertThat(rateLimiter.tryAcquire(0, 100, context)).isFalse();

// when
Expand All @@ -53,10 +59,9 @@ public void shouldCompleteRequestOnResponse() {
}

@Test
public void shouldCompleteAllRequests() {
void shouldCompleteAllRequests() {
// given
IntStream.range(0, limit.getLimit())
.forEach(i -> assertThat(rateLimiter.tryAcquire(0, i, context)));
IntStream.range(0, limit.getLimit()).forEach(i -> rateLimiter.tryAcquire(0, i, context));
assertThat(rateLimiter.tryAcquire(0, 100, context)).isFalse();

// when
Expand All @@ -69,35 +74,37 @@ public void shouldCompleteAllRequests() {
}

@Test
public void shouldAcquireWhenJobCompleteCommandAfterLimit() {
void shouldReleaseRequestOnIgnore() {
// given
IntStream.range(0, limit.getLimit())
.forEach(i -> assertThat(rateLimiter.tryAcquire(0, 1, context)).isTrue());
rateLimiter.tryAcquire(0, 1, context);
assertThat(rateLimiter.getInflightCount()).isEqualTo(1);

// when
rateLimiter.onIgnore(0, 1);

// then
assertThat(rateLimiter.tryAcquire(0, 1, JobIntent.COMPLETE)).isTrue();
assertThat(rateLimiter.getInflightCount()).isEqualTo(0);
}

@Test
public void shouldAcquireWhenJobFailCommandAfterLimit() {
@ParameterizedTest
@MethodSource("provideWhitelistedIntents")
void shouldWhiteListedCommandAfterLimit(final Intent intent) {
// given
IntStream.range(0, limit.getLimit())
.forEach(i -> assertThat(rateLimiter.tryAcquire(0, 1, context)).isTrue());
assertThat(rateLimiter.tryAcquire(0, 1, context)).isFalse();

// then
assertThat(rateLimiter.tryAcquire(0, 1, JobIntent.FAIL)).isTrue();
assertThat(rateLimiter.tryAcquire(0, 1, intent)).isTrue();
}

@Test
public void shouldReleaseRequestOnIgnore() {
// given
rateLimiter.tryAcquire(0, 1, context);
assertThat(rateLimiter.getInflightCount()).isEqualTo(1);

// when
rateLimiter.onIgnore(0, 1);

// then
assertThat(rateLimiter.getInflightCount()).isEqualTo(0);
private static Stream<Arguments> provideWhitelistedIntents() {
return Stream.of(
Arguments.of(JobIntent.COMPLETE),
Arguments.of(JobIntent.FAIL),
Arguments.of(ProcessInstanceIntent.CANCEL),
Arguments.of(DeploymentIntent.CREATE),
Arguments.of(DeploymentIntent.DISTRIBUTE),
Arguments.of(DeploymentDistributionIntent.COMPLETE));
}
}

0 comments on commit 1ead10a

Please sign in to comment.