Skip to content

Commit

Permalink
add option for resource attributes in metrics for prometheus exporter
Browse files Browse the repository at this point in the history
This PR adds the `WithResourceAsConstantLabels` option to the Prometheus exporter to allow users to configure resource attributes to be applied on every metric.

Fixes open-telemetry#4732

Signed-off-by: Alex Boten <aboten@lightstep.com>
  • Loading branch information
Alex Boten committed Nov 29, 2023
1 parent 47ba653 commit 8409717
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 34 deletions.
25 changes: 18 additions & 7 deletions exporters/prometheus/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ import (

"github.com/prometheus/client_golang/prometheus"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
)

// config contains options for the exporter.
type config struct {
registerer prometheus.Registerer
disableTargetInfo bool
withoutUnits bool
withoutCounterSuffixes bool
readerOpts []metric.ManualReaderOption
disableScopeInfo bool
namespace string
registerer prometheus.Registerer
disableTargetInfo bool
withoutUnits bool
withoutCounterSuffixes bool
readerOpts []metric.ManualReaderOption
disableScopeInfo bool
namespace string
resourceAttributesFilter *attribute.Filter
}

// newConfig creates a validated config configured with options.
Expand Down Expand Up @@ -151,3 +153,12 @@ func WithNamespace(ns string) Option {
return cfg
})
}

// WithResourceAsConstantLabels adds resource attributes as metric attributes
// for metrics exported by the Prometheus Exporter.
func WithResourceAsConstantLabels(resourceFilter *attribute.Filter) Option {
return optionFunc(func(cfg config) config {
cfg.resourceAttributesFilter = resourceFilter
return cfg
})
}
13 changes: 13 additions & 0 deletions exporters/prometheus/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
Expand All @@ -31,6 +32,8 @@ func TestNewConfig(t *testing.T) {
aggregationSelector := func(metric.InstrumentKind) metric.Aggregation { return nil }
producer := &noopProducer{}

filter := attribute.NewAllowKeysFilter("K")

testCases := []struct {
name string
options []Option
Expand Down Expand Up @@ -147,6 +150,16 @@ func TestNewConfig(t *testing.T) {
namespace: "test_",
},
},
{
name: "with resource attributes filter",
options: []Option{
WithResourceAsConstantLabels(&filter),
},
wantConfig: config{
registerer: prometheus.DefaultRegisterer,
resourceAttributesFilter: &filter,
},
},
}
for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
Expand Down
72 changes: 45 additions & 27 deletions exporters/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ var _ metric.Reader = &Exporter{}
type collector struct {
reader metric.Reader

withoutUnits bool
withoutCounterSuffixes bool
disableScopeInfo bool
namespace string
withoutUnits bool
withoutCounterSuffixes bool
disableScopeInfo bool
namespace string
resourceAttributesFilter *attribute.Filter

mu sync.Mutex // mu protects all members below from the concurrent access.
disableTargetInfo bool
Expand All @@ -109,15 +110,16 @@ func New(opts ...Option) (*Exporter, error) {
reader := metric.NewManualReader(cfg.readerOpts...)

collector := &collector{
reader: reader,
disableTargetInfo: cfg.disableTargetInfo,
withoutUnits: cfg.withoutUnits,
withoutCounterSuffixes: cfg.withoutCounterSuffixes,
disableScopeInfo: cfg.disableScopeInfo,
scopeInfos: make(map[instrumentation.Scope]prometheus.Metric),
scopeInfosInvalid: make(map[instrumentation.Scope]struct{}),
metricFamilies: make(map[string]*dto.MetricFamily),
namespace: cfg.namespace,
reader: reader,
disableTargetInfo: cfg.disableTargetInfo,
withoutUnits: cfg.withoutUnits,
withoutCounterSuffixes: cfg.withoutCounterSuffixes,
disableScopeInfo: cfg.disableScopeInfo,
scopeInfos: make(map[instrumentation.Scope]prometheus.Metric),
scopeInfosInvalid: make(map[instrumentation.Scope]struct{}),
metricFamilies: make(map[string]*dto.MetricFamily),
namespace: cfg.namespace,
resourceAttributesFilter: cfg.resourceAttributesFilter,
}

if err := cfg.registerer.Register(collector); err != nil {
Expand Down Expand Up @@ -181,6 +183,16 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
ch <- c.targetInfo
}

var resourceAttrs []attribute.KeyValue
if c.resourceAttributesFilter != nil {
for _, kv := range metrics.Resource.Attributes() {
filter := *c.resourceAttributesFilter
if filter(kv) {
resourceAttrs = append(resourceAttrs, kv)
}
}
}

for _, scopeMetrics := range metrics.ScopeMetrics {
var keys, values [2]string

Expand Down Expand Up @@ -219,26 +231,26 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {

switch v := m.Data.(type) {
case metricdata.Histogram[int64]:
addHistogramMetric(ch, v, m, keys, values, name)
addHistogramMetric(ch, v, m, keys, values, name, resourceAttrs)
case metricdata.Histogram[float64]:
addHistogramMetric(ch, v, m, keys, values, name)
addHistogramMetric(ch, v, m, keys, values, name, resourceAttrs)
case metricdata.Sum[int64]:
addSumMetric(ch, v, m, keys, values, name)
addSumMetric(ch, v, m, keys, values, name, resourceAttrs)
case metricdata.Sum[float64]:
addSumMetric(ch, v, m, keys, values, name)
addSumMetric(ch, v, m, keys, values, name, resourceAttrs)
case metricdata.Gauge[int64]:
addGaugeMetric(ch, v, m, keys, values, name)
addGaugeMetric(ch, v, m, keys, values, name, resourceAttrs)
case metricdata.Gauge[float64]:
addGaugeMetric(ch, v, m, keys, values, name)
addGaugeMetric(ch, v, m, keys, values, name, resourceAttrs)
}
}
}
}

func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogram metricdata.Histogram[N], m metricdata.Metrics, ks, vs [2]string, name string) {
func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogram metricdata.Histogram[N], m metricdata.Metrics, ks, vs [2]string, name string, resourceAttrs []attribute.KeyValue) {
// TODO(https://github.com/open-telemetry/opentelemetry-go/issues/3163): support exemplars
for _, dp := range histogram.DataPoints {
keys, values := getAttrs(dp.Attributes, ks, vs)
keys, values := getAttrs(dp.Attributes, ks, vs, resourceAttrs)

desc := prometheus.NewDesc(name, m.Description, keys, nil)
buckets := make(map[float64]uint64, len(dp.Bounds))
Expand All @@ -257,14 +269,14 @@ func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogra
}
}

func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata.Sum[N], m metricdata.Metrics, ks, vs [2]string, name string) {
func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata.Sum[N], m metricdata.Metrics, ks, vs [2]string, name string, resourceAttrs []attribute.KeyValue) {
valueType := prometheus.CounterValue
if !sum.IsMonotonic {
valueType = prometheus.GaugeValue
}

for _, dp := range sum.DataPoints {
keys, values := getAttrs(dp.Attributes, ks, vs)
keys, values := getAttrs(dp.Attributes, ks, vs, resourceAttrs)

desc := prometheus.NewDesc(name, m.Description, keys, nil)
m, err := prometheus.NewConstMetric(desc, valueType, float64(dp.Value), values...)
Expand All @@ -276,9 +288,9 @@ func addSumMetric[N int64 | float64](ch chan<- prometheus.Metric, sum metricdata
}
}

func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metricdata.Gauge[N], m metricdata.Metrics, ks, vs [2]string, name string) {
func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metricdata.Gauge[N], m metricdata.Metrics, ks, vs [2]string, name string, resourceAttrs []attribute.KeyValue) {
for _, dp := range gauge.DataPoints {
keys, values := getAttrs(dp.Attributes, ks, vs)
keys, values := getAttrs(dp.Attributes, ks, vs, resourceAttrs)

desc := prometheus.NewDesc(name, m.Description, keys, nil)
m, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(dp.Value), values...)
Expand All @@ -293,7 +305,7 @@ func addGaugeMetric[N int64 | float64](ch chan<- prometheus.Metric, gauge metric
// getAttrs parses the attribute.Set to two lists of matching Prometheus-style
// keys and values. It sanitizes invalid characters and handles duplicate keys
// (due to sanitization) by sorting and concatenating the values following the spec.
func getAttrs(attrs attribute.Set, ks, vs [2]string) ([]string, []string) {
func getAttrs(attrs attribute.Set, ks, vs [2]string, resourceAttrs []attribute.KeyValue) ([]string, []string) {
keysMap := make(map[string][]string)
itr := attrs.Iter()
for itr.Next() {
Expand All @@ -317,6 +329,12 @@ func getAttrs(attrs attribute.Set, ks, vs [2]string) ([]string, []string) {
values = append(values, strings.Join(vals, ";"))
}

for _, kv := range resourceAttrs {
key := strings.Map(sanitizeRune, string(kv.Key))
keys = append(keys, key)
values = append(values, kv.Value.Emit())
}

if ks[0] != "" {
keys = append(keys, ks[:]...)
values = append(values, vs[:]...)
Expand All @@ -325,7 +343,7 @@ func getAttrs(attrs attribute.Set, ks, vs [2]string) ([]string, []string) {
}

func createInfoMetric(name, description string, res *resource.Resource) (prometheus.Metric, error) {
keys, values := getAttrs(*res.Set(), [2]string{}, [2]string{})
keys, values := getAttrs(*res.Set(), [2]string{}, [2]string{}, []attribute.KeyValue{})
desc := prometheus.NewDesc(name, description, keys, nil)
return prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(1), values...)
}
Expand Down

0 comments on commit 8409717

Please sign in to comment.