Skip to content

Commit

Permalink
[Statsd receiver]Add metric type as a label (#2466)
Browse files Browse the repository at this point in the history
* Add the metric type as a dimension for StatsD receiver

* Update readme

* Fix a typo

* re-run test

* Re-run test

* Update statsd_parser.go

Re run unit test

* Re-run integration test

Re-run integration test
  • Loading branch information
JohnWu20 committed Mar 2, 2021
1 parent 6d75ea4 commit 0050d3d
Show file tree
Hide file tree
Showing 8 changed files with 1,064 additions and 74 deletions.
4 changes: 4 additions & 0 deletions receiver/statsdreceiver/README.md
Expand Up @@ -19,6 +19,8 @@ The Following settings are optional:

- `aggregation_interval: 70s`(default value is 60s): The aggregation time that the receiver aggregates the metrics (similar to the flush interval in StatsD server)

- `enable_metric_type: true`(default value is false): Enbale the statsd receiver to be able to emit the mertic type(gauge, counter, timer(in the future), histogram(in the future)) as a lable.

Example:

```yaml
Expand All @@ -27,6 +29,7 @@ receivers:
statsd/2:
endpoint: "localhost:8127"
aggregation_interval: 70s
enable_metric_type: true
```

The full list of settings exposed for this receiver are documented [here](./config.go)
Expand Down Expand Up @@ -95,6 +98,7 @@ receivers:
statsd:
endpoint: "localhost:8125" # default
aggregation_interval: 60s # default
enable_metric_type: false # default

exporters:
file:
Expand Down
1 change: 1 addition & 0 deletions receiver/statsdreceiver/config.go
Expand Up @@ -26,4 +26,5 @@ type Config struct {
configmodels.ReceiverSettings `mapstructure:",squash"`
NetAddr confignet.NetAddr `mapstructure:",squash"`
AggregationInterval time.Duration `mapstructure:"aggregation_interval"`
EnableMetricType bool `mapstructure:"enable_metric_type"`
}
2 changes: 2 additions & 0 deletions receiver/statsdreceiver/factory.go
Expand Up @@ -31,6 +31,7 @@ const (
defaultBindEndpoint = "localhost:8125"
defaultTransport = "udp"
defaultAggregationInterval = 60 * time.Second
defaultEnableMetricType = false
)

// NewFactory creates a factory for the StatsD receiver.
Expand All @@ -53,6 +54,7 @@ func createDefaultConfig() configmodels.Receiver {
Transport: defaultTransport,
},
AggregationInterval: defaultAggregationInterval,
EnableMetricType: defaultEnableMetricType,
}
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/statsdreceiver/protocol/parser.go
Expand Up @@ -20,7 +20,7 @@ import (

// Parser is something that can map input StatsD strings to OTLP Metric representations.
type Parser interface {
Initialize() error
Initialize(enableMetricType bool) error
GetMetrics() []*metricspb.Metric
Aggregate(line string) error
}
50 changes: 37 additions & 13 deletions receiver/statsdreceiver/protocol/statsd_parser.go
Expand Up @@ -35,10 +35,13 @@ func getSupportedTypes() []string {
return []string{"c", "g"}
}

const TagMetricType = "metric_type"

// StatsDParser supports the Parse method for parsing StatsD messages with Tags.
type StatsDParser struct {
gauges map[statsDMetricdescription]*metricspb.Metric
counters map[statsDMetricdescription]*metricspb.Metric
gauges map[statsDMetricdescription]*metricspb.Metric
counters map[statsDMetricdescription]*metricspb.Metric
enableMetricType bool
}

type statsDMetric struct {
Expand All @@ -60,9 +63,10 @@ type statsDMetricdescription struct {
labels label.Distinct
}

func (p *StatsDParser) Initialize() error {
func (p *StatsDParser) Initialize(enableMetricType bool) error {
p.gauges = make(map[statsDMetricdescription]*metricspb.Metric)
p.counters = make(map[statsDMetricdescription]*metricspb.Metric)
p.enableMetricType = enableMetricType
return nil
}

Expand Down Expand Up @@ -90,7 +94,7 @@ var timeNowFunc = func() int64 {

//aggregate for each metric line
func (p *StatsDParser) Aggregate(line string) error {
parsedMetric, err := parseMessageToMetric(line)
parsedMetric, err := parseMessageToMetric(line, p.enableMetricType)
if err != nil {
return err
}
Expand Down Expand Up @@ -128,7 +132,7 @@ func (p *StatsDParser) Aggregate(line string) error {
return nil
}

func parseMessageToMetric(line string) (statsDMetric, error) {
func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, error) {
result := statsDMetric{}

parts := strings.Split(line, "|")
Expand Down Expand Up @@ -159,6 +163,10 @@ func parseMessageToMetric(line string) (statsDMetric, error) {
}

additionalParts := parts[2:]

var kvs []label.KeyValue
var sortable label.Sortable

for _, part := range additionalParts {
// TODO: Sample rate doesn't currently have a place to go in the protocol
if strings.HasPrefix(part, "@") {
Expand All @@ -175,11 +183,6 @@ func parseMessageToMetric(line string) (statsDMetric, error) {

tagSets := strings.Split(tagsStr, ",")

result.labelKeys = make([]*metricspb.LabelKey, 0, len(tagSets))
result.labelValues = make([]*metricspb.LabelValue, 0, len(tagSets))

var kvs []label.KeyValue
var sortable label.Sortable
for _, tagSet := range tagSets {
tagParts := strings.Split(tagSet, ":")
if len(tagParts) != 2 {
Expand All @@ -192,13 +195,11 @@ func parseMessageToMetric(line string) (statsDMetric, error) {
})
kvs = append(kvs, label.String(tagParts[0], tagParts[1]))
}
set := label.NewSetWithSortable(kvs, &sortable)
result.description.labels = set.Equivalent()

} else {
return result, fmt.Errorf("unrecognized message part: %s", part)
}
}

switch result.description.statsdMetricType {
case "g":
f, err := strconv.ParseFloat(result.value, 64)
Expand All @@ -220,6 +221,29 @@ func parseMessageToMetric(line string) (statsDMetric, error) {
result.metricType = metricspb.MetricDescriptor_GAUGE_INT64
}

// add metric_type dimension for all metrics

if enableMetricType {
var metricType = ""
switch result.description.statsdMetricType {
case "g":
metricType = "gauge"
case "c":
metricType = "counter"
}
result.labelKeys = append(result.labelKeys, &metricspb.LabelKey{Key: TagMetricType})
result.labelValues = append(result.labelValues, &metricspb.LabelValue{
Value: metricType,
HasValue: true,
})
kvs = append(kvs, label.String(TagMetricType, metricType))
}

if len(kvs) != 0 {
set := label.NewSetWithSortable(kvs, &sortable)
result.description.labels = set.Equivalent()
}

return result, nil
}

Expand Down

0 comments on commit 0050d3d

Please sign in to comment.