Skip to content

Commit

Permalink
merge: #9371
Browse files Browse the repository at this point in the history
9371: Configure record filter for metrics exporter r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->

Configures a record filter for the Metrics exporter.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #9240 
relates to #6442 



Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and korthout committed May 13, 2022
2 parents 5457da6 + db64a95 commit f0d5916
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ faces-config.NavData
*.lock.db
.cache-main
.cache-tests
.jqwik-database

*.DS_Store
.java-version
Expand Down
6 changes: 6 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-exporter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-workflow-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public void observeJobActivationTime(
.observe(latencyInSeconds(creationTimeMs, activationTimeMs));
}

public Histogram getJobLifeTime() {
return JOB_LIFE_TIME;
}

/**
* Takes start and end time in milliseconds and calculates the difference (latency) in seconds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import io.camunda.zeebe.broker.system.configuration.ExporterCfg;
import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Context.RecordFilter;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -21,6 +23,7 @@
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import java.time.Duration;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.agrona.collections.Long2LongHashMap;

Expand All @@ -39,13 +42,36 @@ public class MetricsExporter implements Exporter {
private Controller controller;

public MetricsExporter() {
executionLatencyMetrics = new ExecutionLatencyMetrics();
this(new ExecutionLatencyMetrics());
}

public MetricsExporter(final ExecutionLatencyMetrics executionLatencyMetrics) {
this.executionLatencyMetrics = executionLatencyMetrics;
jobKeyToCreationTimeMap = new Long2LongHashMap(-1);
processInstanceKeyToCreationTimeMap = new Long2LongHashMap(-1);
creationTimeToJobKeyNavigableMap = new TreeMap<>();
creationTimeToProcessInstanceKeyNavigableMap = new TreeMap<>();
}

@Override
public void configure(final Context context) throws Exception {
context.setFilter(
new RecordFilter() {
private static final Set<ValueType> ACCEPTED_VALUE_TYPES =
Set.of(ValueType.JOB, ValueType.JOB_BATCH, ValueType.PROCESS_INSTANCE);

@Override
public boolean acceptType(final RecordType recordType) {
return recordType == RecordType.EVENT;
}

@Override
public boolean acceptValue(final ValueType valueType) {
return ACCEPTED_VALUE_TYPES.contains(valueType);
}
});
}

@Override
public void open(final Controller controller) {
this.controller = controller;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.exporter.metrics;

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.exporter.test.ExporterTestContext;
import io.camunda.zeebe.exporter.test.ExporterTestController;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.record.ImmutableRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import java.util.Arrays;
import java.util.stream.Stream;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

class MetricsExporterTest {

@Test
void shouldObserveJobLifetime() {
// given
final var metrics = new ExecutionLatencyMetrics();
final var exporter = new MetricsExporter(metrics);
exporter.open(new ExporterTestController());
assertThat(metrics.getJobLifeTime().collect())
.flatMap(x -> x.samples)
.describedAs("Expected no metrics to be recorded at start of test")
.isEmpty();

// when
exporter.export(
ImmutableRecord.builder()
.withRecordType(RecordType.EVENT)
.withValueType(ValueType.JOB)
.withIntent(JobIntent.CREATED)
.withTimestamp(1651505728460L)
.withKey(Protocol.encodePartitionId(1, 1))
.build());
exporter.export(
ImmutableRecord.builder()
.withRecordType(RecordType.EVENT)
.withValueType(ValueType.JOB)
.withIntent(JobIntent.COMPLETED)
.withTimestamp(1651505729571L)
.withKey(Protocol.encodePartitionId(1, 1))
.build());

// then
assertThat(metrics.getJobLifeTime().collect())
.flatMap(x -> x.samples)
.filteredOn(s -> s.name.equals("zeebe_job_life_time_count"))
.map(s -> s.value)
.describedAs("Expected exactly 1 observed job_life_time sample counted")
.containsExactly(1d);
}

@Nested
@DisplayName("MetricsExporter should configure a Filter")
class FilterTest {

static Stream<TypeCombination> acceptedCombinations() {
return Stream.of(
new TypeCombination(RecordType.EVENT, ValueType.JOB),
new TypeCombination(RecordType.EVENT, ValueType.JOB_BATCH),
new TypeCombination(RecordType.EVENT, ValueType.PROCESS_INSTANCE));
}

/** Returns the inverse of {@link #acceptedCombinations()}. */
static Stream<TypeCombination> rejectedCombinations() {
return allCombinations().filter(any -> acceptedCombinations().noneMatch(any::equals));
}

static Stream<TypeCombination> allCombinations() {
return Arrays.stream(RecordType.values())
.flatMap(
recordType ->
Arrays.stream(ValueType.values())
.map(valueType -> new TypeCombination(recordType, valueType)));
}

@ParameterizedTest
@DisplayName("accepting records of specific RecordType and ValueType")
@MethodSource("acceptedCombinations")
void shouldConfigureFilterAccepting(final TypeCombination combination) throws Exception {
// given
final var recordType = combination.recordType();
final var valueType = combination.valueType();
final var context = new ExporterTestContext();

// when
new MetricsExporter().configure(context);

// then
final var recordFilter = context.getRecordFilter();
assertThat(recordFilter.acceptType(recordType) && recordFilter.acceptValue(valueType))
.describedAs(
"Expect RecordFilter to accept record of RecordType %s and ValueType %s",
recordType, valueType)
.isTrue();
}

@ParameterizedTest
@DisplayName("rejecting records of specific RecordType and ValueType")
@MethodSource("rejectedCombinations")
void shouldConfigureFilterRejecting(final TypeCombination combination) throws Exception {
// given
final var recordType = combination.recordType();
final var valueType = combination.valueType();
final var context = new ExporterTestContext();

// when
new MetricsExporter().configure(context);

// then
final var recordFilter = context.getRecordFilter();
assertThat(recordFilter.acceptType(recordType) && recordFilter.acceptValue(valueType))
.describedAs(
"Expect RecordFilter to reject record of RecordType %s and ValueType %s",
recordType, valueType)
.isFalse();
}

/** Defines a combination of a RecordType and a ValueType. */
record TypeCombination(RecordType recordType, ValueType valueType) {}
}
}

0 comments on commit f0d5916

Please sign in to comment.