Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prototype exposing the exemplar reservoir for upcoming spec changes #5960

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,96 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.exemplar;

import io.opentelemetry.sdk.common.Clock;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;

/**
* An interface for constructing an appropriate ExemplarReservoir for a given metric "memory cell".
*/
public interface ExemplarReservoirFactory {
ExemplarReservoir<LongExemplarData> createLongExemplarReservoir();

ExemplarReservoir<DoubleExemplarData> createDoubleExemplarReservoir();

/** An exemplar reservoir that stores no exemplars. */
static ExemplarReservoirFactory noSamples() {
return new ExemplarReservoirFactory() {
@Override
public ExemplarReservoir<LongExemplarData> createLongExemplarReservoir() {
return NoopExemplarReservoir.LONG_INSTANCE;
}

@Override
public ExemplarReservoir<DoubleExemplarData> createDoubleExemplarReservoir() {
return NoopExemplarReservoir.DOUBLE_INSTANCE;
}

@Override
public String toString() {
return "noSamples";

Check warning on line 38 in sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java

View check run for this annotation

Codecov / codecov/patch

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java#L38

Added line #L38 was not covered by tests
}
};
}

/**
* A reservoir with fixed size that stores the given number of exemplars.
*
* @param clock The clock to use when annotating measurements with time.
* @param size The maximum number of exemplars to preserve.
* @param randomSupplier The random number generator to use for sampling.
*/
static ExemplarReservoirFactory fixedSize(
Clock clock, int size, Supplier<Random> randomSupplier) {
return new ExemplarReservoirFactory() {
@Override
public ExemplarReservoir<LongExemplarData> createLongExemplarReservoir() {
return RandomFixedSizeExemplarReservoir.createLong(clock, size, randomSupplier);
}

@Override
public ExemplarReservoir<DoubleExemplarData> createDoubleExemplarReservoir() {
return RandomFixedSizeExemplarReservoir.createDouble(clock, size, randomSupplier);
}

@Override
public String toString() {
return "fixedSize(" + size + ")";

Check warning on line 65 in sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java

View check run for this annotation

Codecov / codecov/patch

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java#L65

Added line #L65 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see this split out into a class so we can provide a more conventional toString implementation:

Suggested change
return "fixedSize(" + size + ")";
return "FixedSizeReservoirFactory(" + size + ")";

}
};
}

/**
* A Reservoir sampler that preserves the latest seen measurement per-histogram bucket.
*
* @param clock The clock to use when annotating measurements with time.
* @param boundaries A list of (inclusive) upper bounds for the histogram. Should be in order from
* lowest to highest.
*/
static ExemplarReservoirFactory histogramBucket(Clock clock, List<Double> boundaries) {
return new ExemplarReservoirFactory() {
@Override
public ExemplarReservoir<LongExemplarData> createLongExemplarReservoir() {
throw new UnsupportedOperationException(

Check warning on line 81 in sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java

View check run for this annotation

Codecov / codecov/patch

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java#L81

Added line #L81 was not covered by tests
"Cannot create long exemplars for histogram buckets");
}

@Override
public ExemplarReservoir<DoubleExemplarData> createDoubleExemplarReservoir() {
return new HistogramExemplarReservoir(clock, boundaries);
}

@Override
public String toString() {
return "histogramBucket(" + boundaries + ")";

Check warning on line 92 in sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java

View check run for this annotation

Codecov / codecov/patch

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/exemplar/ExemplarReservoirFactory.java#L92

Added line #L92 was not covered by tests
}
};
}
}
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.metrics.internal.view;

import io.opentelemetry.sdk.metrics.Aggregation;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;

/**
* An interface which allows customized configuration of aggregators.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public interface AggregationExtension extends Aggregation, AggregatorFactory {
/** Override the exemplar reservoir used for this aggregation. */
AggregationExtension setExemplarReservoirFactory(ExemplarReservoirFactory reservoirFactory);
}
Expand Up @@ -14,32 +14,40 @@
import io.opentelemetry.sdk.metrics.data.MetricDataType;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.aggregator.DoubleBase2ExponentialHistogramAggregator;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;

/**
* Exponential bucket histogram aggregation configuration.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class Base2ExponentialHistogramAggregation implements Aggregation, AggregatorFactory {
public final class Base2ExponentialHistogramAggregation implements AggregationExtension {

private static final int DEFAULT_MAX_BUCKETS = 160;
private static final int DEFAULT_MAX_SCALE = 20;

private static final ExemplarReservoirFactory DEFAULT_RESERVOIR =
ExemplarReservoirFactory.fixedSize(
Clock.getDefault(),
Runtime.getRuntime().availableProcessors(),
RandomSupplier.platformDefault());
private static final Aggregation DEFAULT =
new Base2ExponentialHistogramAggregation(DEFAULT_MAX_BUCKETS, DEFAULT_MAX_SCALE);
new Base2ExponentialHistogramAggregation(
DEFAULT_MAX_BUCKETS, DEFAULT_MAX_SCALE, DEFAULT_RESERVOIR);

private final int maxBuckets;
private final int maxScale;
private final ExemplarReservoirFactory reservoirFactory;

private Base2ExponentialHistogramAggregation(int maxBuckets, int maxScale) {
private Base2ExponentialHistogramAggregation(
int maxBuckets, int maxScale, ExemplarReservoirFactory reservoirFactory) {
this.maxBuckets = maxBuckets;
this.maxScale = maxScale;
this.reservoirFactory = reservoirFactory;
}

public static Aggregation getDefault() {
Expand All @@ -60,7 +68,7 @@
public static Aggregation create(int maxBuckets, int maxScale) {
checkArgument(maxBuckets >= 1, "maxBuckets must be > 0");
checkArgument(maxScale <= 20 && maxScale >= -10, "maxScale must be -10 <= x <= 20");
return new Base2ExponentialHistogramAggregation(maxBuckets, maxScale);
return new Base2ExponentialHistogramAggregation(maxBuckets, maxScale, DEFAULT_RESERVOIR);
}

@Override
Expand All @@ -71,11 +79,7 @@
new DoubleBase2ExponentialHistogramAggregator(
() ->
ExemplarReservoir.filtered(
exemplarFilter,
ExemplarReservoir.doubleFixedSizeReservoir(
Clock.getDefault(),
Runtime.getRuntime().availableProcessors(),
RandomSupplier.platformDefault())),
exemplarFilter, reservoirFactory.createDoubleExemplarReservoir()),
maxBuckets,
maxScale);
}
Expand All @@ -99,4 +103,11 @@
+ maxScale
+ "}";
}

@Override
public AggregationExtension setExemplarReservoirFactory(
ExemplarReservoirFactory reservoirFactory) {
return new Base2ExponentialHistogramAggregation(

Check warning on line 110 in sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregation.java

View check run for this annotation

Codecov / codecov/patch

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/Base2ExponentialHistogramAggregation.java#L110

Added line #L110 was not covered by tests
this.maxBuckets, this.maxScale, reservoirFactory);
}
}
Expand Up @@ -10,21 +10,22 @@
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;

/**
* Aggregation that selects the specified default based on instrument.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DefaultAggregation implements Aggregation, AggregatorFactory {
public final class DefaultAggregation implements AggregationExtension {

private static final Aggregation INSTANCE = new DefaultAggregation();
private static final Aggregation INSTANCE = new DefaultAggregation(null);

public static Aggregation getInstance() {
return INSTANCE;
Expand All @@ -33,44 +34,60 @@
private static final ThrottlingLogger logger =
new ThrottlingLogger(Logger.getLogger(DefaultAggregation.class.getName()));

private DefaultAggregation() {}
@Nullable private final ExemplarReservoirFactory reservoirFactory;

private static Aggregation resolve(InstrumentDescriptor instrument, boolean withAdvice) {
private DefaultAggregation(@Nullable ExemplarReservoirFactory reservoirFactory) {
this.reservoirFactory = reservoirFactory;
}

private static AggregationExtension resolve(InstrumentDescriptor instrument, boolean withAdvice) {
switch (instrument.getType()) {
case COUNTER:
case UP_DOWN_COUNTER:
case OBSERVABLE_COUNTER:
case OBSERVABLE_UP_DOWN_COUNTER:
return SumAggregation.getInstance();
return (AggregationExtension) SumAggregation.getInstance();
case HISTOGRAM:
if (withAdvice && instrument.getAdvice().getExplicitBucketBoundaries() != null) {
return ExplicitBucketHistogramAggregation.create(
instrument.getAdvice().getExplicitBucketBoundaries());
return (AggregationExtension)
ExplicitBucketHistogramAggregation.create(
instrument.getAdvice().getExplicitBucketBoundaries());
}
return ExplicitBucketHistogramAggregation.getDefault();
return (AggregationExtension) ExplicitBucketHistogramAggregation.getDefault();
case OBSERVABLE_GAUGE:
return LastValueAggregation.getInstance();
return (AggregationExtension) LastValueAggregation.getInstance();
}
logger.log(Level.WARNING, "Unable to find default aggregation for instrument: " + instrument);
return DropAggregation.getInstance();
return (AggregationExtension) DropAggregation.getInstance();

Check warning on line 61 in sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java

View check run for this annotation

Codecov / codecov/patch

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java#L61

Added line #L61 was not covered by tests
}

@Override
public <T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
InstrumentDescriptor instrumentDescriptor, ExemplarFilter exemplarFilter) {
return ((AggregatorFactory) resolve(instrumentDescriptor, /* withAdvice= */ true))
if (this.reservoirFactory != null) {
return resolve(instrumentDescriptor, /* withAdvice= */ true)
.setExemplarReservoirFactory(this.reservoirFactory)
.createAggregator(instrumentDescriptor, exemplarFilter);

Check warning on line 70 in sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java

View check run for this annotation

Codecov / codecov/patch

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java#L68-L70

Added lines #L68 - L70 were not covered by tests
}
return resolve(instrumentDescriptor, /* withAdvice= */ true)
.createAggregator(instrumentDescriptor, exemplarFilter);
}

@Override
public boolean isCompatibleWithInstrument(InstrumentDescriptor instrumentDescriptor) {
// This should always return true
return ((AggregatorFactory) resolve(instrumentDescriptor, /* withAdvice= */ false))
return resolve(instrumentDescriptor, /* withAdvice= */ false)
.isCompatibleWithInstrument(instrumentDescriptor);
}

@Override
public String toString() {
return "DefaultAggregation";
}

@Override
public AggregationExtension setExemplarReservoirFactory(
ExemplarReservoirFactory reservoirFactory) {
return new DefaultAggregation(reservoirFactory);

Check warning on line 91 in sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java

View check run for this annotation

Codecov / codecov/patch

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DefaultAggregation.java#L91

Added line #L91 was not covered by tests
}
}
Expand Up @@ -9,17 +9,17 @@
import io.opentelemetry.sdk.metrics.data.ExemplarData;
import io.opentelemetry.sdk.metrics.data.PointData;
import io.opentelemetry.sdk.metrics.internal.aggregator.Aggregator;
import io.opentelemetry.sdk.metrics.internal.aggregator.AggregatorFactory;
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;

/**
* Configuration representing no aggregation.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public final class DropAggregation implements Aggregation, AggregatorFactory {
public final class DropAggregation implements AggregationExtension {

private static final Aggregation INSTANCE = new DropAggregation();

Expand All @@ -45,4 +45,10 @@
public String toString() {
return "DropAggregation";
}

@Override
public AggregationExtension setExemplarReservoirFactory(
ExemplarReservoirFactory reservoirFactory) {
throw new UnsupportedOperationException("DropAggregation does not allow exemplars");

Check warning on line 52 in sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DropAggregation.java

View check run for this annotation

Codecov / codecov/patch

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/view/DropAggregation.java#L52

Added line #L52 was not covered by tests
}
}