Skip to content

Commit

Permalink
Merge pull request #76 from michey/escape-dot-in-mqtt-name
Browse files Browse the repository at this point in the history
Add way to change default separator for json pathes
  • Loading branch information
hikhvar committed Dec 25, 2021
2 parents 7f22fd5 + 25115cf commit c6c8f27
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 8 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ config.yaml
bin/
vendor
dist
.vscode
2 changes: 1 addition & 1 deletion cmd/mqtt2prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func setupGoKitLogger(l *zap.Logger) log.Logger {
}

func setupExtractor(cfg config.Config) (metrics.Extractor, error) {
parser := metrics.NewParser(cfg.Metrics)
parser := metrics.NewParser(cfg.Metrics, cfg.JsonParsing.Separator)
if cfg.MQTT.ObjectPerTopicConfig != nil {
switch cfg.MQTT.ObjectPerTopicConfig.Encoding {
case config.EncodingJSON:
Expand Down
5 changes: 5 additions & 0 deletions config.yaml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ cache:
# Set the timeout to -1 to disable the deletion of metrics from the cache. The exporter presents the ingest timestamp
# to prometheus.
timeout: 24h
json_parsing:
# Separator. Used to split path to elements when accessing json fields.
# You can access json fields with dots in it. F.E. {"key.name": {"nested": "value"}}
# Just set separator to -> and use key.name->nested as mqtt_name
separator: .
# This is a list of valid metrics. Only metrics listed here will be exported
metrics:
# The name of the metric in prometheus
Expand Down
2 changes: 1 addition & 1 deletion fuzzing/json_per_topic/fuzz.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func Fuzz(data []byte) int {
PrometheusName: "kartoffeln",
ValueType: "counter",
},
})
}, ".")
json := metrics.NewJSONObjectExtractor(p)
mc, err := json("foo", data, "bar")
if err != nil && len(mc) > 0 {
Expand Down
18 changes: 15 additions & 3 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ var CacheConfigDefaults = CacheConfig{
Timeout: 2 * time.Minute,
}

var JsonParsingConfigDefaults = JsonParsingConfig{
Separator: ".",
}

type Regexp struct {
r *regexp.Regexp
pattern string
Expand Down Expand Up @@ -83,15 +87,20 @@ func MustNewRegexp(pattern string) *Regexp {
}

type Config struct {
Metrics []MetricConfig `yaml:"metrics"`
MQTT *MQTTConfig `yaml:"mqtt,omitempty"`
Cache *CacheConfig `yaml:"cache,omitempty"`
JsonParsing *JsonParsingConfig `yaml:"json_parsing,omitempty"`
Metrics []MetricConfig `yaml:"metrics"`
MQTT *MQTTConfig `yaml:"mqtt,omitempty"`
Cache *CacheConfig `yaml:"cache,omitempty"`
}

type CacheConfig struct {
Timeout time.Duration `yaml:"timeout"`
}

type JsonParsingConfig struct {
Separator string `yaml:"separator"`
}

type MQTTConfig struct {
Server string `yaml:"server"`
TopicPath string `yaml:"topic_path"`
Expand Down Expand Up @@ -167,6 +176,9 @@ func LoadConfig(configFile string) (Config, error) {
if cfg.Cache == nil {
cfg.Cache = &CacheConfigDefaults
}
if cfg.JsonParsing == nil {
cfg.JsonParsing = &JsonParsingConfigDefaults
}
if cfg.MQTT.DeviceIDRegex == nil {
cfg.MQTT.DeviceIDRegex = MQTTConfigDefaults.DeviceIDRegex
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/metrics/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package metrics
import (
"errors"
"fmt"

"github.com/hikhvar/mqtt2prometheus/pkg/config"
gojsonq "github.com/thedevsaddam/gojsonq/v2"
)
Expand All @@ -12,7 +13,7 @@ type Extractor func(topic string, payload []byte, deviceID string) (MetricCollec
func NewJSONObjectExtractor(p Parser) Extractor {
return func(topic string, payload []byte, deviceID string) (MetricCollection, error) {
var mc MetricCollection
parsed := gojsonq.New().FromString(string(payload))
parsed := gojsonq.New(gojsonq.SetSeparator(p.separator)).FromString(string(payload))

for path := range p.config() {
rawValue := parsed.Find(path)
Expand Down
103 changes: 103 additions & 0 deletions pkg/metrics/extractor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package metrics

import (
"reflect"
"testing"

"github.com/hikhvar/mqtt2prometheus/pkg/config"
"github.com/prometheus/client_golang/prometheus"
)

func TestNewJSONObjectExtractor_parseMetric(t *testing.T) {
now = testNow
type fields struct {
metricConfigs map[string][]config.MetricConfig
}
type args struct {
metricPath string
deviceID string
value string
}
tests := []struct {
name string
separator string
fields fields
args args
want Metric
wantErr bool
}{
{
name: "string value",
separator: "->",
fields: fields{
map[string][]config.MetricConfig{
"SDS0X1->PM2->5": []config.MetricConfig{
{
PrometheusName: "temperature",
MQTTName: "SDS0X1.PM2.5",
ValueType: "gauge",
},
},
},
},
args: args{
metricPath: "topic",
deviceID: "dht22",
value: "{\"SDS0X1\":{\"PM2\":{\"5\":4.9}}}",
},
want: Metric{
Description: prometheus.NewDesc("temperature", "", []string{"sensor", "topic"}, nil),
ValueType: prometheus.GaugeValue,
Value: 4.9,
IngestTime: testNow(),
Topic: "topic",
},
}, {
name: "string value with dots in path",
separator: "->",
fields: fields{
map[string][]config.MetricConfig{
"SDS0X1->PM2.5": []config.MetricConfig{
{
PrometheusName: "temperature",
MQTTName: "SDS0X1->PM2.5",
ValueType: "gauge",
},
},
},
},
args: args{
metricPath: "topic",
deviceID: "dht22",
value: "{\"SDS0X1\":{\"PM2.5\":4.9,\"PM10\":8.5}}",
},
want: Metric{
Description: prometheus.NewDesc("temperature", "", []string{"sensor", "topic"}, nil),
ValueType: prometheus.GaugeValue,
Value: 4.9,
IngestTime: testNow(),
Topic: "topic",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := Parser{
separator: tt.separator,
metricConfigs: tt.fields.metricConfigs,
}
extractor := NewJSONObjectExtractor(p)

got, err := extractor(tt.args.metricPath, []byte(tt.args.value), tt.args.deviceID)
if (err != nil) != tt.wantErr {
t.Errorf("parseMetric() error = %v, wantErr %v", err, tt.wantErr)
return
}
if len(got) != 1 {
t.Errorf("parseMetric() got = %v, want %v", nil, tt.want)
} else if !reflect.DeepEqual(got[0], tt.want) {
t.Errorf("parseMetric() got = %v, want %v", got[0], tt.want)
}
})
}
}
7 changes: 5 additions & 2 deletions pkg/metrics/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,31 @@ package metrics
import (
"errors"
"fmt"
"github.com/hikhvar/mqtt2prometheus/pkg/config"
"strconv"
"time"

"github.com/hikhvar/mqtt2prometheus/pkg/config"
)

type metricNotConfiguredError error

var metricNotConfigured metricNotConfiguredError = errors.New("metric not configured failed to parse")

type Parser struct {
separator string
metricConfigs map[string][]config.MetricConfig
}

var now = time.Now

func NewParser(metrics []config.MetricConfig) Parser {
func NewParser(metrics []config.MetricConfig, separator string) Parser {
cfgs := make(map[string][]config.MetricConfig)
for i := range metrics {
key := metrics[i].MQTTName
cfgs[key] = append(cfgs[key], metrics[i])
}
return Parser{
separator: separator,
metricConfigs: cfgs,
}
}
Expand Down

0 comments on commit c6c8f27

Please sign in to comment.