Skip to content

Commit

Permalink
merge: #9425
Browse files Browse the repository at this point in the history
9425: [Backport stable/1.3] Configure record filter for metrics exporter r=npepinpe a=korthout

# Description
Backport of #9371 to `stable/1.3`.

relates to #9240 #6442

Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
3 people committed May 25, 2022
2 parents a753fb8 + ab1bf63 commit b142fd3
Show file tree
Hide file tree
Showing 16 changed files with 764 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ faces-config.NavData
*.lock.db
.cache-main
.cache-tests
.jqwik-database

*.DS_Store
.java-version
Expand Down
6 changes: 6 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-exporter-test</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-elasticsearch-exporter</artifactId>
Expand Down
6 changes: 6 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@
<groupId>io.camunda</groupId>
<artifactId>zeebe-workflow-engine</artifactId>
</dependency>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-exporter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-workflow-engine</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public void observeJobActivationTime(
.observe(latencyInSeconds(creationTimeMs, activationTimeMs));
}

public Histogram getJobLifeTime() {
return JOB_LIFE_TIME;
}

/**
* Takes start and end time in milliseconds and calculates the difference (latency) in seconds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import io.camunda.zeebe.broker.system.configuration.ExporterCfg;
import io.camunda.zeebe.exporter.api.Exporter;
import io.camunda.zeebe.exporter.api.context.Context;
import io.camunda.zeebe.exporter.api.context.Context.RecordFilter;
import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.protocol.record.Record;
import io.camunda.zeebe.protocol.record.RecordType;
Expand All @@ -21,12 +23,17 @@
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
import java.time.Duration;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.agrona.collections.Long2LongHashMap;

public class MetricsExporter implements Exporter {

public static final Duration TIME_TO_LIVE = Duration.ofSeconds(10);

private static final Set<ValueType> ACCEPTED_VALUE_TYPES =
Set.of(ValueType.JOB, ValueType.JOB_BATCH, ValueType.PROCESS_INSTANCE);

private final ExecutionLatencyMetrics executionLatencyMetrics;
private final Long2LongHashMap jobKeyToCreationTimeMap;
private final Long2LongHashMap processInstanceKeyToCreationTimeMap;
Expand All @@ -39,13 +46,33 @@ public class MetricsExporter implements Exporter {
private Controller controller;

public MetricsExporter() {
executionLatencyMetrics = new ExecutionLatencyMetrics();
this(new ExecutionLatencyMetrics());
}

public MetricsExporter(final ExecutionLatencyMetrics executionLatencyMetrics) {
this.executionLatencyMetrics = executionLatencyMetrics;
jobKeyToCreationTimeMap = new Long2LongHashMap(-1);
processInstanceKeyToCreationTimeMap = new Long2LongHashMap(-1);
creationTimeToJobKeyNavigableMap = new TreeMap<>();
creationTimeToProcessInstanceKeyNavigableMap = new TreeMap<>();
}

@Override
public void configure(final Context context) throws Exception {
context.setFilter(
new RecordFilter() {
@Override
public boolean acceptType(final RecordType recordType) {
return recordType == RecordType.EVENT;
}

@Override
public boolean acceptValue(final ValueType valueType) {
return ACCEPTED_VALUE_TYPES.contains(valueType);
}
});
}

@Override
public void open(final Controller controller) {
this.controller = controller;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.broker.exporter.metrics;

import static org.assertj.core.api.Assertions.assertThat;

import io.camunda.zeebe.exporter.test.ExporterTestContext;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.ValueType;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Stream;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

class MetricsExporterTest {

/** Defines a combination of a RecordType and a ValueType. */
static class TypeCombination {
RecordType recordType;
ValueType valueType;

public TypeCombination(final RecordType recordType, final ValueType valueType) {
this.recordType = recordType;
this.valueType = valueType;
}

public RecordType recordType() {
return recordType;
}

public ValueType valueType() {
return valueType;
}

@Override
public int hashCode() {
return Objects.hash(recordType, valueType);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final TypeCombination that = (TypeCombination) o;
return recordType == that.recordType && valueType == that.valueType;
}

@Override
public String toString() {
return "TypeCombination{" + "recordType=" + recordType + ", valueType=" + valueType + '}';
}
}

@TestInstance(Lifecycle.PER_CLASS)
@Nested
@DisplayName("MetricsExporter should configure a Filter")
class FilterTest {

Stream<TypeCombination> acceptedCombinations() {
return Stream.of(
new TypeCombination(RecordType.EVENT, ValueType.JOB),
new TypeCombination(RecordType.EVENT, ValueType.JOB_BATCH),
new TypeCombination(RecordType.EVENT, ValueType.PROCESS_INSTANCE));
}

/** Returns the inverse of {@link #acceptedCombinations()}. */
Stream<TypeCombination> rejectedCombinations() {
return allCombinations().filter(any -> acceptedCombinations().noneMatch(any::equals));
}

Stream<TypeCombination> allCombinations() {
return Arrays.stream(RecordType.values())
.flatMap(
recordType ->
Arrays.stream(ValueType.values())
.map(valueType -> new TypeCombination(recordType, valueType)));
}

@ParameterizedTest
@DisplayName("accepting records of specific RecordType and ValueType")
@MethodSource("acceptedCombinations")
void shouldConfigureFilterAccepting(final TypeCombination combination) throws Exception {
// given
final var recordType = combination.recordType();
final var valueType = combination.valueType();
final var context = new ExporterTestContext();

// when
new MetricsExporter().configure(context);

// then
final var recordFilter = context.getRecordFilter();
assertThat(recordFilter.acceptType(recordType) && recordFilter.acceptValue(valueType))
.describedAs(
"Expect RecordFilter to accept record of RecordType %s and ValueType %s",
recordType, valueType)
.isTrue();
}

@ParameterizedTest
@DisplayName("rejecting records of specific RecordType and ValueType")
@MethodSource("rejectedCombinations")
void shouldConfigureFilterRejecting(final TypeCombination combination) throws Exception {
// given
final var recordType = combination.recordType();
final var valueType = combination.valueType();
final var context = new ExporterTestContext();

// when
new MetricsExporter().configure(context);

// then
final var recordFilter = context.getRecordFilter();
assertThat(recordFilter.acceptType(recordType) && recordFilter.acceptValue(valueType))
.describedAs(
"Expect RecordFilter to reject record of RecordType %s and ValueType %s",
recordType, valueType)
.isFalse();
}
}
}
56 changes: 56 additions & 0 deletions exporter-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.camunda</groupId>
<artifactId>zeebe-parent</artifactId>
<version>1.3.9-SNAPSHOT</version>
<relativePath>../parent/pom.xml</relativePath>
</parent>

<artifactId>zeebe-exporter-test</artifactId>
<packaging>jar</packaging>

<name>Zeebe Exporter Test Harness</name>

<dependencies>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-exporter-api</artifactId>
</dependency>

<!-- utilities -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<!-- SpotBugs annotations -->
<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.agrona</groupId>
<artifactId>agrona</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.exporter.test;

import io.camunda.zeebe.exporter.api.context.Configuration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import net.jcip.annotations.Immutable;

/**
* An immutable implementation of {@link Configuration}. Accepts configuration suppliers, passing
* the arguments map to the supplier. This allows for flexible injection of configuration when
* testing the exporter.
*
* @param <T> the actual configuration type
*/
@Immutable
public final class ExporterTestConfiguration<T> implements Configuration {
private final String id;
private final Map<String, Object> arguments;
private final Function<Map<String, Object>, T> configurationSupplier;

public ExporterTestConfiguration(final String id, final T configuration) {
this(id, ignored -> configuration);
}

public ExporterTestConfiguration(
final String id, final Function<Map<String, Object>, T> configurationSupplier) {
this(id, Collections.emptyMap(), configurationSupplier);
}

public ExporterTestConfiguration(
final String id,
final Map<String, Object> arguments,
final Function<Map<String, Object>, T> configurationSupplier) {
this.id = Objects.requireNonNull(id, "must specify an ID");
this.arguments = Objects.requireNonNull(arguments, "must specify arguments");
this.configurationSupplier =
Objects.requireNonNull(configurationSupplier, "must specific a configurationSupplier");
}

@Override
public String getId() {
return id;
}

@Override
public Map<String, Object> getArguments() {
return arguments;
}

@Override
public <R> R instantiate(final Class<R> configClass) {
Objects.requireNonNull(configClass, "must pass a non null configClass");

final var configuration = configurationSupplier.apply(arguments);
return configClass.cast(configuration);
}
}

0 comments on commit b142fd3

Please sign in to comment.