Skip to content

Commit

Permalink
fix(broker): configure metrics exporter filter
Browse files Browse the repository at this point in the history
The metrics exporter shouldn't accept all records, because then the
exporter director unnecessarily reads record values that it this
exporter won't export anyways. In the past, this has led to the exporter
getting stuck when it ran into reading problems like #6442.

The metrics exporter is only interested in 3 events (job, job_batch and
process instance).

(cherry picked from commit db64a95)
  • Loading branch information
korthout committed May 20, 2022
1 parent 1f96926 commit 3845619
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import io.camunda.zeebe.broker.system.configuration.ExporterCfg;
import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Context.RecordFilter;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -21,6 +23,7 @@
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import java.time.Duration;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.agrona.collections.Long2LongHashMap;

Expand Down Expand Up @@ -50,6 +53,25 @@ public MetricsExporter(final ExecutionLatencyMetrics executionLatencyMetrics) {
creationTimeToProcessInstanceKeyNavigableMap = new TreeMap<>();
}

@Override
public void configure(final Context context) throws Exception {
context.setFilter(
new RecordFilter() {
private static final Set<ValueType> ACCEPTED_VALUE_TYPES =
Set.of(ValueType.JOB, ValueType.JOB_BATCH, ValueType.PROCESS_INSTANCE);

@Override
public boolean acceptType(final RecordType recordType) {
return recordType == RecordType.EVENT;
}

@Override
public boolean acceptValue(final ValueType valueType) {
return ACCEPTED_VALUE_TYPES.contains(valueType);
}
});
}

@Override
public void open(final Controller controller) {
this.controller = controller;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ void shouldObserveJobLifetime() {
class FilterTest {

static Stream<TypeCombination> acceptedCombinations() {
return Stream.of();
return Stream.of(
new TypeCombination(RecordType.EVENT, ValueType.JOB),
new TypeCombination(RecordType.EVENT, ValueType.JOB_BATCH),
new TypeCombination(RecordType.EVENT, ValueType.PROCESS_INSTANCE));
}

/** Returns the inverse of {@link #acceptedCombinations()}. */
Expand Down

0 comments on commit 3845619

Please sign in to comment.