Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stabilize exemplars #5249

Open
1 of 6 tasks
MrAlias opened this issue Apr 22, 2024 · 18 comments
Open
1 of 6 tasks

Stabilize exemplars #5249

MrAlias opened this issue Apr 22, 2024 · 18 comments
Assignees
Labels
area:metrics Part of OpenTelemetry Metrics
Milestone

Comments

@MrAlias
Copy link
Contributor

MrAlias commented Apr 22, 2024

Exemplars are now stable in the specification.

  • Audit our implementation for compliance
  • Stabilize

Compliance TODOs

  • Export an ExemplarReservoir type and accept it as stream configuration
  • Enable exemplars by default
  • Allow the ExemplarFilter to be a configured for a MeterProvider with a default value of TraceBased (continue to support environment variable configuration of the ExemplarFilter)

Stabilization procedure and plan

TODO:

  • Handle the existing experimental envar
@MrAlias MrAlias added the area:metrics Part of OpenTelemetry Metrics label Apr 22, 2024
@MrAlias MrAlias added this to the v1.27.0 milestone Apr 22, 2024
@MrAlias MrAlias self-assigned this May 1, 2024
@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

Stream configuration

[...]

The SDK MUST accept the following stream configuration parameters:

[...]

  • exemplar_reservoir: A
    functional type that generates an exemplar reservoir a MeterProvider will
    use when storing exemplars. This functional type needs to be a factory or
    callback similar to aggregation selection functionality which allows
    different reservoirs to be chosen by the aggregation.

    Users can provide an exemplar_reservoir, but it is up to their discretion.
    Therefore, the stream configuration parameter needs to be structured to
    accept an exemplar_reservoir, but MUST NOT obligate a user to provide one.
    If the user does not provide an exemplar_reservoir value, the
    MeterProvider MUST apply a default exemplar
    reservoir
    .

We need to export an ExemplarReservoir type and accept it as stream configuration.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

A Metric SDK MUST provide a mechanism to sample Exemplars from measurements via the ExemplarFilter and ExemplarReservoir hooks.

We do not comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

Exemplar sampling SHOULD be turned on by default.

We do not comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

If Exemplar sampling is off, the SDK MUST NOT have overhead related to exemplar sampling.

Our current implementation complies with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

A Metric SDK MUST allow exemplar sampling to leverage the configuration of
metric aggregation. For example, Exemplar sampling of histograms should be able
to leverage bucket boundaries.

A Metric SDK SHOULD provide configuration for Exemplar sampling, specifically:

  • ExemplarFilter: filter which measurements can become exemplars.
  • ExemplarReservoir: storage and sampling of exemplars.

We do not comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

The ExemplarFilter configuration MUST allow users to select between one of the
built-in ExemplarFilters. While ExemplarFilter determines which measurements
are eligible for becoming an Exemplar, the ExemplarReservoir makes the
final decision if a measurement becomes an exemplar and is stored.

The ExemplarFilter SHOULD be a configuration parameter of a MeterProvider for
an SDK. The default value SHOULD be TraceBased. The filter configuration
SHOULD follow the environment variable specification.

We do not comply with this.

This is a clarification of #5249 (comment).

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

An OpenTelemetry SDK MUST support the following filters:

  • AlwaysOn
  • AlwaysOff
  • TraceBased

Our implementation supports these filters.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

The ExemplarReservoir interface MUST provide a method to offer measurements to the reservoir and another to collect accumulated Exemplars.

// Reservoir holds the sampled exemplar of measurements made.
type Reservoir[N int64 | float64] interface {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The val and attr
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
Offer(ctx context.Context, t time.Time, val N, attr []attribute.KeyValue)
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
Collect(dest *[]metricdata.Exemplar[N])
}

We comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

A new ExemplarReservoir MUST be created for every known timeseries data point, as determined by aggregation and view configuration. This data point, and its set of defining attributes, are referred to as the associated timeseries point.

We comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

The "offer" method SHOULD have the ability to pull associated trace and span
information without needing to record full context. In other words, current
span context and baggage can be inspected at this point.

The "offer" method does not need to store all measurements it is given and
MAY further sample beyond the ExemplarFilter.

The "offer" method MAY accept a filtered subset of Attributes which diverge
from the timeseries the reservoir is associated with. This MUST be clearly
documented in the API interface and the reservoir MUST be given the Attributes
associated with its timeseries point either at construction so that additional
sampling performed by the reservoir has access to all attributes from a
measurement in the "offer" method. SDK authors are encouraged to benchmark
whether this option works best for their implementation.

The "collect" method MUST return accumulated Exemplars. Exemplars are expected
to abide by the AggregationTemporality of any metric point they are recorded
with. In other words, Exemplars reported against a metric data point SHOULD have
occurred within the start/stop timestamps of that point. SDKs are free to
decide whether "collect" should also reset internal storage for delta temporal
aggregation collection, or use a more optimal implementation.

Exemplars MUST retain any attributes available in the measurement that
are not preserved by aggregation or view configuration for the associated
timeseries. Joining together attributes on an Exemplar with
those available on its associated metric data point should result in the
full set of attributes from the original sample measurement.

The ExemplarReservoir SHOULD avoid allocations when sampling exemplars.

We comply with these.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

The SDK MUST include two types of built-in exemplar reservoirs:

  1. SimpleFixedSizeExemplarReservoir
  2. AlignedHistogramBucketExemplarReservoir

By default:

  • Explicit bucket histogram aggregation with more than 1 bucket SHOULD
    use AlignedHistogramBucketExemplarReservoir.
  • Base2 Exponential Histogram Aggregation SHOULD use a
    SimpleFixedSizeExemplarReservoir with a reservoir equal to the
    smaller of the maximum number of buckets configured on the aggregation or
    twenty (e.g. min(20, max_buckets)).
  • All other aggregations SHOULD use SimpleFixedSizeExemplarReservoir.

resF := func() func() exemplar.Reservoir[N] {
// Explicit bucket histogram aggregation with more than 1 bucket will
// use AlignedHistogramBucketExemplarReservoir.
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.Reservoir[N] {
bounds := cp
return exemplar.Histogram[N](bounds)
}
}
var n int
if a, ok := agg.(AggregationBase2ExponentialHistogram); ok {
// Base2 Exponential Histogram Aggregation SHOULD use a
// SimpleFixedSizeExemplarReservoir with a reservoir equal to the
// smaller of the maximum number of buckets configured on the
// aggregation or twenty (e.g. min(20, max_buckets)).
n = int(a.MaxSize)
if n > 20 {
n = 20
}
} else {
// https://github.com/open-telemetry/opentelemetry-specification/blob/e94af89e3d0c01de30127a0f423e912f6cda7bed/specification/metrics/sdk.md#simplefixedsizeexemplarreservoir
// This Exemplar reservoir MAY take a configuration parameter for
// the size of the reservoir. If no size configuration is
// provided, the default size MAY be the number of possible
// concurrent threads (e.g. number of CPUs) to help reduce
// contention. Otherwise, a default size of 1 SHOULD be used.
n = runtime.NumCPU()
if n < 1 {
// Should never be the case, but be defensive.
n = 1
}
}
return func() exemplar.Reservoir[N] {
return exemplar.FixedSize[N](n)
}
}

We comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

SimpleFixedSizeExemplarReservoir

This reservoir MUST use an uniformly-weighted sampling algorithm based on the
number of samples the reservoir has seen so far to determine if the offered
measurements should be sampled. For example, the simple reservoir sampling
algorithm
can be used:

if num_measurements_seen < num_buckets then
  bucket = num_measurements_seen
else
  bucket = random_integer(0, num_measurements_seen)
end
if bucket < num_buckets then
  reservoir[bucket] = measurement
end

Any stateful portion of sampling computation SHOULD be reset every collection
cycle. For the above example, that would mean that the num_measurements_seen
count is reset every time the reservoir is collected.

This Exemplar reservoir MAY take a configuration parameter for the size of the
reservoir. If no size configuration is provided, the default size MAY be
the number of possible concurrent threads (e.g. numer of CPUs) to help reduce
contention. Otherwise, a default size of 1 SHOULD be used.

We comply with this:

func (r *randRes[N]) Offer(ctx context.Context, t time.Time, n N, a []attribute.KeyValue) {

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

AlignedHistogramBucketExemplarReservoir

This Exemplar reservoir MUST take a configuration parameter that is the
configuration of a Histogram. This implementation MUST keep the last seen
measurement that falls within a histogram bucket. The reservoir will accept
measurements using the equivalent of the following naive algorithm:

bucket = find_histogram_bucket(measurement)
if bucket < num_buckets then
  reservoir[bucket] = measurement
end

def find_histogram_bucket(measurement):
  for boundary, idx in bucket_boundaries do
    if value <= boundary then
      return idx
    end
  end
  return boundaries.length

This Exemplar reservoir MAY take a configuration parameter for the bucket
boundaries used by the reservoir. The size of the reservoir is always the
number of bucket boundaries plus one. This configuration parameter SHOULD have
the same format as specifying bucket boundaries to
Explicit Bucket Histogram Aggregation.

We comply with this: https://github.com/open-telemetry/opentelemetry-go/blob/7ee6ff19b51eb4bffdd48639ac5698c9ee8932d6/sdk/metric/internal/exemplar/hist.go

@MrAlias
Copy link
Contributor Author

MrAlias commented May 1, 2024

Custom ExemplarReservoir

The SDK MUST provide a mechanism for SDK users to provide their own
ExemplarReservoir implementation. This extension MUST be configurable on
a metric View, although individual reservoirs MUST still be
instantiated per metric-timeseries (see
Exemplar Reservoir - Paragraph 2).

We do not comply with this.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 3, 2024

Stream configuration

[...]
The SDK MUST accept the following stream configuration parameters:
[...]

  • exemplar_reservoir: A
    functional type that generates an exemplar reservoir a MeterProvider will
    use when storing exemplars. This functional type needs to be a factory or
    callback similar to aggregation selection functionality which allows
    different reservoirs to be chosen by the aggregation.
    Users can provide an exemplar_reservoir, but it is up to their discretion.
    Therefore, the stream configuration parameter needs to be structured to
    accept an exemplar_reservoir, but MUST NOT obligate a user to provide one.
    If the user does not provide an exemplar_reservoir value, the
    MeterProvider MUST apply a default exemplar
    reservoir
    .

We need to export an ExemplarReservoir type and accept it as stream configuration.

This is going to be a challenge. The Stream type is not defined generically, but our current ExemplarReservoir is defined over [N int64 | float64] to accommodate both value types.

Not sure how we can restructure the ExemplarReservoir or update the Stream type to include this as a field yet.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 3, 2024

To address the generics, we can create a Value type:

type ValueType uint8

const (
	UnknownValueType ValueType = 0
	Int64ValueType   ValueType = 1
	Float64ValueType ValueType = 2
)

type Value struct {
	t   ValueType
	val uint64
}

func NewValue[N int64 | float64](val N) Value {
	switch v := any(val).(type) {
	case int64:
		return newInt64Value(v)
	case float64:
		return newFloat64Value(v)
	}
	return Value{}
}

func newInt64Value(val int64) Value {
	return Value{t: Int64ValueType, val: uint64(val)}
}

func newFloat64Value(val float64) Value {
	return Value{t: Float64ValueType, val: math.Float64bits(val)}
}

func (v Value) Type() ValueType { return v.t }

func (v Value) Int64() int64 {
	if v.t == Int64ValueType {
		return v.int64()
	}
	return 0
}

func (v Value) int64() int64 { return int64(v.val) }

func (v Value) Float64() float64 {
	if v.t == Float64ValueType {
		return math.Float64frombits(v.val)
	}
	return 0
}

func (v Value) float64() float64 { return math.Float64frombits(v.val) }

func (v Value) Any() any {
	switch v.t {
	case Int64ValueType:
		return v.int64()
	case Float64ValueType:
		return v.float64()
	}
	return nil
}

From there we can define an exemplar:

// Exemplar is a measurement sampled from a timeseries providing a typical
// example.
type Exemplar struct {
	// FilteredAttributes are the attributes recorded with the measurement but
	// filtered out of the timeseries' aggregated data.
	FilteredAttributes []attribute.KeyValue
	// Time is the time when the measurement was recorded.
	Time time.Time
	// Value is the measured value.
	Value Value
	// SpanID is the ID of the span that was active during the measurement. If
	// no span was active or the span was not sampled this will be empty.
	SpanID []byte `json:",omitempty"`
	// TraceID is the ID of the trace the active span belonged to during the
	// measurement. If no span was active or the span was not sampled this will
	// be empty.
	TraceID []byte `json:",omitempty"`
}

and a Reservoir

// Reservoir holds the sampled exemplar of measurements made.
type Reservoir interface {
	// Offer accepts the parameters associated with a measurement. The
	// parameters will be stored as an exemplar if the Reservoir decides to
	// sample the measurement.
	//
	// The passed ctx needs to contain any baggage or span that were active
	// when the measurement was made. This information may be used by the
	// Reservoir in making a sampling decision.
	//
	// The time t is the time when the measurement was made. The val and attr
	// parameters are the value and dropped (filtered) attributes of the
	// measurement respectively.
	Offer(ctx context.Context, t time.Time, val Value, attr []attribute.KeyValue)

	// Collect returns all the held exemplars.
	//
	// The Reservoir state is preserved after this call.
	Collect(dest *[]Exemplar)
}

@MrAlias
Copy link
Contributor Author

MrAlias commented May 3, 2024

I'm working to refactor sdk/metric/internal/exemplar to match this design. That way we can validate it before releasing.

@MrAlias
Copy link
Contributor Author

MrAlias commented May 6, 2024

Moving to the v1.28.0 milestone as #5285 needs to be released and used before that code is exported to resolve this.

@MrAlias MrAlias modified the milestones: v1.27.0, v1.28.0 May 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:metrics Part of OpenTelemetry Metrics
Projects
None yet
Development

No branches or pull requests

1 participant