From 0c560b8b0d9d316656e73b631e235253707e6eb4 Mon Sep 17 00:00:00 2001 From: Michael Mandrus <41969079+mmandrus@users.noreply.github.com> Date: Fri, 2 Dec 2022 00:05:44 +0100 Subject: [PATCH] Graphite: Process multiple queries to Graphite plugin (#59608) * make create call consistent with update and delete * send multiple targets to graphite and correlate the responses with the requests * make create call consistent with update and delete * send multiple targets to graphite and correlate the responses with the requests * Revert "make create call consistent with update and delete" This reverts commit 26b6463bd6f72338d58872f4570b0acd5a985ec7. * refactor query -> target parsing and fix unit tests * add additional validations and more unit tests * change error statement to warn --- pkg/tsdb/graphite/graphite.go | 111 +++++++++++----- pkg/tsdb/graphite/graphite_test.go | 198 ++++++++++++++++++++++++++++- 2 files changed, 268 insertions(+), 41 deletions(-) diff --git a/pkg/tsdb/graphite/graphite.go b/pkg/tsdb/graphite/graphite.go index e7cd7600dfd0..cc4e388b7849 100644 --- a/pkg/tsdb/graphite/graphite.go +++ b/pkg/tsdb/graphite/graphite.go @@ -111,39 +111,24 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) "until": []string{until}, "format": []string{"json"}, "maxDataPoints": []string{"500"}, + "target": []string{}, } - // Calculate and get the last target of Graphite Request - var target string - emptyQueries := make([]string, 0) - for _, query := range req.Queries { - model, err := simplejson.NewJson(query.JSON) - if err != nil { - return nil, err - } - logger.Debug("graphite", "query", model) - currTarget := "" - if fullTarget, err := model.Get(TargetFullModelField).String(); err == nil { - currTarget = fullTarget - } else { - currTarget = model.Get(TargetModelField).MustString() - } - if currTarget == "" { - logger.Debug("graphite", "empty query target", model) - emptyQueries = append(emptyQueries, fmt.Sprintf("Query: %v has no target", model)) - continue - } - target = fixIntervalFormat(currTarget) + // Convert datasource query to graphite target request + targetList, emptyQueries, origRefIds, err := s.processQueries(logger, req.Queries) + if err != nil { + return nil, err } var result = backend.QueryDataResponse{} - - if target == "" { - logger.Error("No targets in query model", "models without targets", strings.Join(emptyQueries, "\n")) - return &result, errors.New("no query target found for the alert rule") + if len(emptyQueries) != 0 { + logger.Warn("Found query models without targets", "models without targets", strings.Join(emptyQueries, "\n")) + // If no queries had a valid target, return an error; otherwise, attempt with the targets we have + if len(emptyQueries) == len(req.Queries) { + return &result, errors.New("no query target found for the alert rule") + } } - - formData["target"] = []string{target} + formData["target"] = targetList if setting.Env == setting.Dev { logger.Debug("Graphite request", "params", formData) @@ -157,7 +142,8 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) ctx, span := s.tracer.Start(ctx, "graphite query") defer span.End() - span.SetAttributes("target", target, attribute.Key("target").String(target)) + targetStr := strings.Join(formData["target"], ",") + span.SetAttributes("target", targetStr, attribute.Key("target").String(targetStr)) span.SetAttributes("from", from, attribute.Key("from").String(from)) span.SetAttributes("until", until, attribute.Key("until").String(until)) span.SetAttributes("datasource_id", dsInfo.Id, attribute.Key("datasource_id").Int64(dsInfo.Id)) @@ -175,7 +161,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) return &result, err } - frames, err := s.toDataFrames(logger, res) + frames, err := s.toDataFrames(logger, res, origRefIds) if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) @@ -186,13 +172,57 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) Responses: make(backend.Responses), } - result.Responses["A"] = backend.DataResponse{ - Frames: frames, + for _, f := range frames { + result.Responses[f.Name] = backend.DataResponse{ + Frames: data.Frames{f}, + } } return &result, nil } +// processQueries converts each datasource query to a graphite query target. It returns the list of +// targets, a list of invalid queries, and a mapping of formatted refIds (used in the target query) +// to original query refIds, later used to associate ressponses with the original queries +func (s *Service) processQueries(logger log.Logger, queries []backend.DataQuery) ([]string, []string, map[string]string, error) { + emptyQueries := make([]string, 0) + origRefIds := make(map[string]string, 0) + targets := make([]string, 0) + + for _, query := range queries { + model, err := simplejson.NewJson(query.JSON) + if err != nil { + return nil, nil, nil, err + } + logger.Debug("graphite", "query", model) + currTarget := "" + if fullTarget, err := model.Get(TargetFullModelField).String(); err == nil { + currTarget = fullTarget + } else { + currTarget = model.Get(TargetModelField).MustString() + } + if currTarget == "" { + logger.Debug("graphite", "empty query target", model) + emptyQueries = append(emptyQueries, fmt.Sprintf("Query: %v has no target", model)) + continue + } + target := fixIntervalFormat(currTarget) + + // This is a somewhat inglorious way to ensure we can associate results with the right query + // By using aliasSub, we can get back a resolved series Target name (accounting for other aliases) + // And the original refId. Since there are no restrictions on refId, we need to format it to make it + // easy to find in the response + formattedRefId := strings.ReplaceAll(query.RefID, " ", "_") + origRefIds[formattedRefId] = query.RefID + // This will set the alias to ` ` + // e.g. aliasSub(alias(myquery, "foo"), "(^.*$)", "\1 A") will return "foo A" + target = fmt.Sprintf("aliasSub(%s,\"(^.*$)\",\"\\1 %s\")", target, formattedRefId) + targets = append(targets, target) + } + + return targets, emptyQueries, origRefIds, nil +} + func (s *Service) parseResponse(logger log.Logger, res *http.Response) ([]TargetResponseDTO, error) { body, err := io.ReadAll(res.Body) if err != nil { @@ -219,7 +249,7 @@ func (s *Service) parseResponse(logger log.Logger, res *http.Response) ([]Target return data, nil } -func (s *Service) toDataFrames(logger log.Logger, response *http.Response) (frames data.Frames, error error) { +func (s *Service) toDataFrames(logger log.Logger, response *http.Response, origRefIds map[string]string) (frames data.Frames, error error) { responseData, err := s.parseResponse(logger, response) if err != nil { return nil, err @@ -229,7 +259,18 @@ func (s *Service) toDataFrames(logger log.Logger, response *http.Response) (fram for _, series := range responseData { timeVector := make([]time.Time, 0, len(series.DataPoints)) values := make([]*float64, 0, len(series.DataPoints)) - name := series.Target + // series.Target will be in the format + ls := strings.LastIndex(series.Target, " ") + if ls == -1 { + return nil, fmt.Errorf("received graphite response with invalid target format: %s", series.Target) + } + target := series.Target[:ls] + formattedRefId := series.Target[ls+1:] + refId, ok := origRefIds[formattedRefId] + if !ok { + logger.Warn("Unable to find refId associated with provided formattedRefId", "formattedRefId", formattedRefId) + refId = formattedRefId // fallback - shouldn't happen except for in tests + } for _, dataPoint := range series.DataPoints { var timestamp, value, err = parseDataTimePoint(dataPoint) @@ -250,9 +291,9 @@ func (s *Service) toDataFrames(logger log.Logger, response *http.Response) (fram } } - frames = append(frames, data.NewFrame(name, + frames = append(frames, data.NewFrame(refId, data.NewField("time", nil, timeVector), - data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name}))) + data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: target}))) if setting.Env == setting.Dev { logger.Debug("Graphite response", "target", series.Target, "datapoints", len(series.DataPoints)) diff --git a/pkg/tsdb/graphite/graphite_test.go b/pkg/tsdb/graphite/graphite_test.go index 1d406d8d30e0..88b2f677be9b 100644 --- a/pkg/tsdb/graphite/graphite_test.go +++ b/pkg/tsdb/graphite/graphite_test.go @@ -1,7 +1,9 @@ package graphite import ( + "context" "encoding/json" + "fmt" "io" "net/http" "reflect" @@ -9,7 +11,10 @@ import ( "testing" "time" + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/components/simplejson" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -48,27 +53,123 @@ func TestFixIntervalFormat(t *testing.T) { assert.Equal(t, tc.expected, tr) }) } +} +func TestProcessQueries(t *testing.T) { + service := &Service{} + log := logger.FromContext(context.Background()) + t.Run("Parses single valid query", func(t *testing.T) { + queries := []backend.DataQuery{ + { + RefID: "A", + JSON: []byte(`{ + "target": "app.grafana.*.dashboards.views.1M.count" + }`), + }, + } + targets, invalids, mapping, err := service.processQueries(log, queries) + assert.NoError(t, err) + assert.Empty(t, invalids) + assert.Len(t, mapping, 1) + assert.Len(t, targets, 1) + assert.Equal(t, "aliasSub(app.grafana.*.dashboards.views.1M.count,\"(^.*$)\",\"\\1 A\")", targets[0]) + }) + + t.Run("Parses multiple valid queries with refId mappings", func(t *testing.T) { + queries := []backend.DataQuery{ + { + RefID: "A", + JSON: []byte(`{ + "target": "app.grafana.*.dashboards.views.1M.count" + }`), + }, + { + RefID: "query B", + JSON: []byte(`{ + "target": "aliasByNode(hitcount(averageSeries(app.grafana.*.dashboards.views.count), '1mon'), 4)" + }`), + }, + } + targets, invalids, mapping, err := service.processQueries(log, queries) + assert.NoError(t, err) + assert.Empty(t, invalids) + assert.Len(t, mapping, 2) + assert.Len(t, targets, 2) + assert.Equal(t, "aliasSub(app.grafana.*.dashboards.views.1M.count,\"(^.*$)\",\"\\1 A\")", targets[0]) + assert.Equal(t, "aliasSub(aliasByNode(hitcount(averageSeries(app.grafana.*.dashboards.views.count), '1mon'), 4),\"(^.*$)\",\"\\1 query_B\")", targets[1]) + }) + + t.Run("Parses multiple queries with one invalid", func(t *testing.T) { + queries := []backend.DataQuery{ + { + RefID: "A", + JSON: []byte(`{ + "target": "app.grafana.*.dashboards.views.1M.count" + }`), + }, + { + RefID: "B", + JSON: []byte(`{ + "query": "app.grafana.*.dashboards.views.1M.count" + }`), + }, + } + targets, invalids, mapping, err := service.processQueries(log, queries) + assert.NoError(t, err) + assert.Len(t, invalids, 1) + assert.Len(t, mapping, 1) + assert.Len(t, targets, 1) + json, _ := simplejson.NewJson(queries[1].JSON) + expectedInvalid := fmt.Sprintf("Query: %v has no target", json) + assert.Equal(t, expectedInvalid, invalids[0]) + }) + + t.Run("QueryData with no valid queries returns an error", func(t *testing.T) { + queries := []backend.DataQuery{ + { + RefID: "A", + JSON: []byte(`{ + "query": "app.grafana.*.dashboards.views.1M.count" + }`), + }, + { + RefID: "B", + JSON: []byte(`{ + "query": "app.grafana.*.dashboards.views.1M.count" + }`), + }, + } + + service.im = fakeInstanceManager{} + _, err := service.QueryData(context.Background(), &backend.QueryDataRequest{ + Queries: queries, + }) + assert.Error(t, err) + assert.Equal(t, err.Error(), "no query target found for the alert rule") + }) +} + +func TestConvertResponses(t *testing.T) { service := &Service{} t.Run("Converts response without tags to data frames", func(*testing.T) { body := ` [ { - "target": "target", + "target": "target A", "datapoints": [[50, 1], [null, 2], [100, 3]] } ]` a := 50.0 b := 100.0 - expectedFrame := data.NewFrame("target", + expectedFrame := data.NewFrame("A", data.NewField("time", nil, []time.Time{time.Unix(1, 0).UTC(), time.Unix(2, 0).UTC(), time.Unix(3, 0).UTC()}), data.NewField("value", data.Labels{}, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "target"}), ) expectedFrames := data.Frames{expectedFrame} httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))} - dataFrames, err := service.toDataFrames(logger, httpResponse) + dataFrames, err := service.toDataFrames(logger, httpResponse, map[string]string{}) require.NoError(t, err) if !reflect.DeepEqual(expectedFrames, dataFrames) { @@ -82,14 +183,14 @@ func TestFixIntervalFormat(t *testing.T) { body := ` [ { - "target": "target", + "target": "target A", "tags": { "fooTag": "fooValue", "barTag": "barValue", "int": 100, "float": 3.14 }, "datapoints": [[50, 1], [null, 2], [100, 3]] } ]` a := 50.0 b := 100.0 - expectedFrame := data.NewFrame("target", + expectedFrame := data.NewFrame("A", data.NewField("time", nil, []time.Time{time.Unix(1, 0).UTC(), time.Unix(2, 0).UTC(), time.Unix(3, 0).UTC()}), data.NewField("value", data.Labels{ "fooTag": "fooValue", @@ -101,7 +202,7 @@ func TestFixIntervalFormat(t *testing.T) { expectedFrames := data.Frames{expectedFrame} httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))} - dataFrames, err := service.toDataFrames(logger, httpResponse) + dataFrames, err := service.toDataFrames(logger, httpResponse, map[string]string{}) require.NoError(t, err) if !reflect.DeepEqual(expectedFrames, dataFrames) { @@ -110,4 +211,89 @@ func TestFixIntervalFormat(t *testing.T) { t.Errorf("Data frames should have been equal but was, expected:\n%s\nactual:\n%s", expectedFramesJSON, dataFramesJSON) } }) + + t.Run("Converts response with multiple targets", func(*testing.T) { + body := ` + [ + { + "target": "target 1 A", + "datapoints": [[50, 1], [null, 2], [100, 3]] + }, + { + "target": "target 2 B", + "datapoints": [[50, 1], [null, 2], [100, 3]] + } + ]` + a := 50.0 + b := 100.0 + expectedFrameA := data.NewFrame("A", + data.NewField("time", nil, []time.Time{time.Unix(1, 0).UTC(), time.Unix(2, 0).UTC(), time.Unix(3, 0).UTC()}), + data.NewField("value", data.Labels{}, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "target 1"}), + ) + expectedFrameB := data.NewFrame("B", + data.NewField("time", nil, []time.Time{time.Unix(1, 0).UTC(), time.Unix(2, 0).UTC(), time.Unix(3, 0).UTC()}), + data.NewField("value", data.Labels{}, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "target 2"}), + ) + expectedFrames := data.Frames{expectedFrameA, expectedFrameB} + + httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))} + dataFrames, err := service.toDataFrames(logger, httpResponse, map[string]string{}) + + require.NoError(t, err) + if !reflect.DeepEqual(expectedFrames, dataFrames) { + expectedFramesJSON, _ := json.Marshal(expectedFrames) + dataFramesJSON, _ := json.Marshal(dataFrames) + t.Errorf("Data frames should have been equal but was, expected:\n%s\nactual:\n%s", expectedFramesJSON, dataFramesJSON) + } + }) + + t.Run("Converts response with refId mapping", func(*testing.T) { + body := ` + [ + { + "target": "target A_A", + "datapoints": [[50, 1], [null, 2], [100, 3]] + } + ]` + a := 50.0 + b := 100.0 + expectedFrame := data.NewFrame("A A", + data.NewField("time", nil, []time.Time{time.Unix(1, 0).UTC(), time.Unix(2, 0).UTC(), time.Unix(3, 0).UTC()}), + data.NewField("value", data.Labels{}, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "target"}), + ) + expectedFrames := data.Frames{expectedFrame} + + httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))} + dataFrames, err := service.toDataFrames(logger, httpResponse, map[string]string{"A_A": "A A"}) + + require.NoError(t, err) + if !reflect.DeepEqual(expectedFrames, dataFrames) { + expectedFramesJSON, _ := json.Marshal(expectedFrames) + dataFramesJSON, _ := json.Marshal(dataFrames) + t.Errorf("Data frames should have been equal but was, expected:\n%s\nactual:\n%s", expectedFramesJSON, dataFramesJSON) + } + }) + + t.Run("Chokes on response with invalid target name", func(*testing.T) { + body := ` + [ + { + "target": "target", + "datapoints": [[50, 1], [null, 2], [100, 3]] + } + ]` + httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))} + _, err := service.toDataFrames(logger, httpResponse, map[string]string{}) + require.Error(t, err) + }) +} + +type fakeInstanceManager struct{} + +func (f fakeInstanceManager) Get(pluginContext backend.PluginContext) (instancemgmt.Instance, error) { + return datasourceInfo{}, nil +} + +func (f fakeInstanceManager) Do(pluginContext backend.PluginContext, fn instancemgmt.InstanceCallbackFunc) error { + return nil }