Skip to content

Commit

Permalink
merge: #9378
Browse files Browse the repository at this point in the history
9378: [Backport stable/8.0] Configure record filter for metrics exporter r=npepinpe a=github-actions[bot]

# Description
Backport of #9371 to `stable/8.0`.

relates to #9240 #6442

Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
3 people committed May 25, 2022
2 parents 9c28a36 + ef96afa commit b360777
Show file tree
Hide file tree
Showing 15 changed files with 764 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ faces-config.NavData
*.lock.db
.cache-main
.cache-tests
.jqwik-database

*.DS_Store
.java-version
Expand Down
6 changes: 6 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@
<groupId>io.camunda</groupId>
<artifactId>zeebe-workflow-engine</artifactId>
</dependency>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-exporter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-workflow-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public void observeJobActivationTime(
.observe(latencyInSeconds(creationTimeMs, activationTimeMs));
}

public Histogram getJobLifeTime() {
return JOB_LIFE_TIME;
}

/**
* Takes start and end time in milliseconds and calculates the difference (latency) in seconds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import io.camunda.zeebe.broker.system.configuration.ExporterCfg;
import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Context.RecordFilter;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -21,6 +23,7 @@
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import java.time.Duration;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.agrona.collections.Long2LongHashMap;

Expand All @@ -39,13 +42,36 @@ public class MetricsExporter implements Exporter {
private Controller controller;

public MetricsExporter() {
executionLatencyMetrics = new ExecutionLatencyMetrics();
this(new ExecutionLatencyMetrics());
}

public MetricsExporter(final ExecutionLatencyMetrics executionLatencyMetrics) {
this.executionLatencyMetrics = executionLatencyMetrics;
jobKeyToCreationTimeMap = new Long2LongHashMap(-1);
processInstanceKeyToCreationTimeMap = new Long2LongHashMap(-1);
creationTimeToJobKeyNavigableMap = new TreeMap<>();
creationTimeToProcessInstanceKeyNavigableMap = new TreeMap<>();
}

@Override
public void configure(final Context context) throws Exception {
context.setFilter(
new RecordFilter() {
private static final Set<ValueType> ACCEPTED_VALUE_TYPES =
Set.of(ValueType.JOB, ValueType.JOB_BATCH, ValueType.PROCESS_INSTANCE);

@Override
public boolean acceptType(final RecordType recordType) {
return recordType == RecordType.EVENT;
}

@Override
public boolean acceptValue(final ValueType valueType) {
return ACCEPTED_VALUE_TYPES.contains(valueType);
}
});
}

@Override
public void open(final Controller controller) {
this.controller = controller;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.exporter.metrics;

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.exporter.test.ExporterTestContext;
import io.camunda.zeebe.exporter.test.ExporterTestController;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.record.ImmutableRecord;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import java.util.Arrays;
import java.util.stream.Stream;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

class MetricsExporterTest {

@Test
void shouldObserveJobLifetime() {
// given
final var metrics = new ExecutionLatencyMetrics();
final var exporter = new MetricsExporter(metrics);
exporter.open(new ExporterTestController());
assertThat(metrics.getJobLifeTime().collect())
.flatMap(x -> x.samples)
.describedAs("Expected no metrics to be recorded at start of test")
.isEmpty();

// when
exporter.export(
ImmutableRecord.builder()
.withRecordType(RecordType.EVENT)
.withValueType(ValueType.JOB)
.withIntent(JobIntent.CREATED)
.withTimestamp(1651505728460L)
.withKey(Protocol.encodePartitionId(1, 1))
.build());
exporter.export(
ImmutableRecord.builder()
.withRecordType(RecordType.EVENT)
.withValueType(ValueType.JOB)
.withIntent(JobIntent.COMPLETED)
.withTimestamp(1651505729571L)
.withKey(Protocol.encodePartitionId(1, 1))
.build());

// then
assertThat(metrics.getJobLifeTime().collect())
.flatMap(x -> x.samples)
.filteredOn(s -> s.name.equals("zeebe_job_life_time_count"))
.map(s -> s.value)
.describedAs("Expected exactly 1 observed job_life_time sample counted")
.containsExactly(1d);
}

@Nested
@DisplayName("MetricsExporter should configure a Filter")
class FilterTest {

static Stream<TypeCombination> acceptedCombinations() {
return Stream.of(
new TypeCombination(RecordType.EVENT, ValueType.JOB),
new TypeCombination(RecordType.EVENT, ValueType.JOB_BATCH),
new TypeCombination(RecordType.EVENT, ValueType.PROCESS_INSTANCE));
}

/** Returns the inverse of {@link #acceptedCombinations()}. */
static Stream<TypeCombination> rejectedCombinations() {
return allCombinations().filter(any -> acceptedCombinations().noneMatch(any::equals));
}

static Stream<TypeCombination> allCombinations() {
return Arrays.stream(RecordType.values())
.flatMap(
recordType ->
Arrays.stream(ValueType.values())
.map(valueType -> new TypeCombination(recordType, valueType)));
}

@ParameterizedTest
@DisplayName("accepting records of specific RecordType and ValueType")
@MethodSource("acceptedCombinations")
void shouldConfigureFilterAccepting(final TypeCombination combination) throws Exception {
// given
final var recordType = combination.recordType();
final var valueType = combination.valueType();
final var context = new ExporterTestContext();

// when
new MetricsExporter().configure(context);

// then
final var recordFilter = context.getRecordFilter();
assertThat(recordFilter.acceptType(recordType) && recordFilter.acceptValue(valueType))
.describedAs(
"Expect RecordFilter to accept record of RecordType %s and ValueType %s",
recordType, valueType)
.isTrue();
}

@ParameterizedTest
@DisplayName("rejecting records of specific RecordType and ValueType")
@MethodSource("rejectedCombinations")
void shouldConfigureFilterRejecting(final TypeCombination combination) throws Exception {
// given
final var recordType = combination.recordType();
final var valueType = combination.valueType();
final var context = new ExporterTestContext();

// when
new MetricsExporter().configure(context);

// then
final var recordFilter = context.getRecordFilter();
assertThat(recordFilter.acceptType(recordType) && recordFilter.acceptValue(valueType))
.describedAs(
"Expect RecordFilter to reject record of RecordType %s and ValueType %s",
recordType, valueType)
.isFalse();
}

/** Defines a combination of a RecordType and a ValueType. */
record TypeCombination(RecordType recordType, ValueType valueType) {}
}
}
56 changes: 56 additions & 0 deletions exporter-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.camunda</groupId>
<artifactId>zeebe-parent</artifactId>
<version>8.0.3-SNAPSHOT</version>
<relativePath>../parent/pom.xml</relativePath>
</parent>

<artifactId>zeebe-exporter-test</artifactId>
<packaging>jar</packaging>

<name>Zeebe Exporter Test Harness</name>

<dependencies>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-exporter-api</artifactId>
</dependency>

<!-- utilities -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<!-- SpotBugs annotations -->
<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.agrona</groupId>
<artifactId>agrona</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.exporter.test;

import io.camunda.zeebe.exporter.api.context.Configuration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import net.jcip.annotations.Immutable;

/**
* An immutable implementation of {@link Configuration}. Accepts configuration suppliers, passing
* the arguments map to the supplier. This allows for flexible injection of configuration when
* testing the exporter.
*
* @param <T> the actual configuration type
*/
@Immutable
public final class ExporterTestConfiguration<T> implements Configuration {
private final String id;
private final Map<String, Object> arguments;
private final Function<Map<String, Object>, T> configurationSupplier;

public ExporterTestConfiguration(final String id, final T configuration) {
this(id, ignored -> configuration);
}

public ExporterTestConfiguration(
final String id, final Function<Map<String, Object>, T> configurationSupplier) {
this(id, Collections.emptyMap(), configurationSupplier);
}

public ExporterTestConfiguration(
final String id,
final Map<String, Object> arguments,
final Function<Map<String, Object>, T> configurationSupplier) {
this.id = Objects.requireNonNull(id, "must specify an ID");
this.arguments = Objects.requireNonNull(arguments, "must specify arguments");
this.configurationSupplier =
Objects.requireNonNull(configurationSupplier, "must specific a configurationSupplier");
}

@Override
public String getId() {
return id;
}

@Override
public Map<String, Object> getArguments() {
return arguments;
}

@Override
public <R> R instantiate(final Class<R> configClass) {
Objects.requireNonNull(configClass, "must pass a non null configClass");

final var configuration = configurationSupplier.apply(arguments);
return configClass.cast(configuration);
}
}

0 comments on commit b360777

Please sign in to comment.