Skip to content

Commit

Permalink
fix(broker): configure metrics exporter filter
Browse files Browse the repository at this point in the history
The metrics exporter shouldn't accept all records, because then the
exporter director unnecessarily reads record values that it this
exporter won't export anyways. In the past, this has led to the exporter
getting stuck when it ran into reading problems like #6442.

The metrics exporter is only interested in 3 events (job, job_batch and
process instance).

(cherry picked from commit db64a95)
  • Loading branch information
korthout authored and github-actions[bot] committed May 13, 2022
1 parent 7709e39 commit 421fe42
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import io.camunda.zeebe.broker.system.configuration.ExporterCfg;
import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Context.RecordFilter;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -21,6 +23,7 @@
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import java.time.Duration;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.agrona.collections.Long2LongHashMap;

Expand Down Expand Up @@ -50,6 +53,25 @@ public MetricsExporter(final ExecutionLatencyMetrics executionLatencyMetrics) {
creationTimeToProcessInstanceKeyNavigableMap = new TreeMap<>();
}

@Override
public void configure(final Context context) throws Exception {
context.setFilter(
new RecordFilter() {
private static final Set<ValueType> ACCEPTED_VALUE_TYPES =
Set.of(ValueType.JOB, ValueType.JOB_BATCH, ValueType.PROCESS_INSTANCE);

@Override
public boolean acceptType(final RecordType recordType) {
return recordType == RecordType.EVENT;
}

@Override
public boolean acceptValue(final ValueType valueType) {
return ACCEPTED_VALUE_TYPES.contains(valueType);
}
});
}

@Override
public void open(final Controller controller) {
this.controller = controller;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ void shouldObserveJobLifetime() {
class FilterTest {

static Stream<TypeCombination> acceptedCombinations() {
return Stream.of();
return Stream.of(
new TypeCombination(RecordType.EVENT, ValueType.JOB),
new TypeCombination(RecordType.EVENT, ValueType.JOB_BATCH),
new TypeCombination(RecordType.EVENT, ValueType.PROCESS_INSTANCE));
}

/** Returns the inverse of {@link #acceptedCombinations()}. */
Expand Down

0 comments on commit 421fe42

Please sign in to comment.