Skip to content

Commit

Permalink
Graphite: Process multiple queries to Graphite plugin (#59608)
Browse files Browse the repository at this point in the history
* make create call consistent with update and delete

* send multiple targets to graphite and correlate the responses with the requests

* make create call consistent with update and delete

* send multiple targets to graphite and correlate the responses with the requests

* Revert "make create call consistent with update and delete"

This reverts commit 26b6463.

* refactor query -> target parsing and fix unit tests

* add additional validations and more unit tests

* change error statement to warn
  • Loading branch information
mmandrus committed Dec 1, 2022
1 parent e1e8583 commit 0c560b8
Show file tree
Hide file tree
Showing 2 changed files with 268 additions and 41 deletions.
111 changes: 76 additions & 35 deletions pkg/tsdb/graphite/graphite.go
Expand Up @@ -111,39 +111,24 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
"until": []string{until},
"format": []string{"json"},
"maxDataPoints": []string{"500"},
"target": []string{},
}

// Calculate and get the last target of Graphite Request
var target string
emptyQueries := make([]string, 0)
for _, query := range req.Queries {
model, err := simplejson.NewJson(query.JSON)
if err != nil {
return nil, err
}
logger.Debug("graphite", "query", model)
currTarget := ""
if fullTarget, err := model.Get(TargetFullModelField).String(); err == nil {
currTarget = fullTarget
} else {
currTarget = model.Get(TargetModelField).MustString()
}
if currTarget == "" {
logger.Debug("graphite", "empty query target", model)
emptyQueries = append(emptyQueries, fmt.Sprintf("Query: %v has no target", model))
continue
}
target = fixIntervalFormat(currTarget)
// Convert datasource query to graphite target request
targetList, emptyQueries, origRefIds, err := s.processQueries(logger, req.Queries)
if err != nil {
return nil, err
}

var result = backend.QueryDataResponse{}

if target == "" {
logger.Error("No targets in query model", "models without targets", strings.Join(emptyQueries, "\n"))
return &result, errors.New("no query target found for the alert rule")
if len(emptyQueries) != 0 {
logger.Warn("Found query models without targets", "models without targets", strings.Join(emptyQueries, "\n"))
// If no queries had a valid target, return an error; otherwise, attempt with the targets we have
if len(emptyQueries) == len(req.Queries) {
return &result, errors.New("no query target found for the alert rule")
}
}

formData["target"] = []string{target}
formData["target"] = targetList

if setting.Env == setting.Dev {
logger.Debug("Graphite request", "params", formData)
Expand All @@ -157,7 +142,8 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
ctx, span := s.tracer.Start(ctx, "graphite query")
defer span.End()

span.SetAttributes("target", target, attribute.Key("target").String(target))
targetStr := strings.Join(formData["target"], ",")
span.SetAttributes("target", targetStr, attribute.Key("target").String(targetStr))
span.SetAttributes("from", from, attribute.Key("from").String(from))
span.SetAttributes("until", until, attribute.Key("until").String(until))
span.SetAttributes("datasource_id", dsInfo.Id, attribute.Key("datasource_id").Int64(dsInfo.Id))
Expand All @@ -175,7 +161,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
return &result, err
}

frames, err := s.toDataFrames(logger, res)
frames, err := s.toDataFrames(logger, res, origRefIds)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
Expand All @@ -186,13 +172,57 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
Responses: make(backend.Responses),
}

result.Responses["A"] = backend.DataResponse{
Frames: frames,
for _, f := range frames {
result.Responses[f.Name] = backend.DataResponse{
Frames: data.Frames{f},
}
}

return &result, nil
}

// processQueries converts each datasource query to a graphite query target. It returns the list of
// targets, a list of invalid queries, and a mapping of formatted refIds (used in the target query)
// to original query refIds, later used to associate ressponses with the original queries
func (s *Service) processQueries(logger log.Logger, queries []backend.DataQuery) ([]string, []string, map[string]string, error) {
emptyQueries := make([]string, 0)
origRefIds := make(map[string]string, 0)
targets := make([]string, 0)

for _, query := range queries {
model, err := simplejson.NewJson(query.JSON)
if err != nil {
return nil, nil, nil, err
}
logger.Debug("graphite", "query", model)
currTarget := ""
if fullTarget, err := model.Get(TargetFullModelField).String(); err == nil {
currTarget = fullTarget
} else {
currTarget = model.Get(TargetModelField).MustString()
}
if currTarget == "" {
logger.Debug("graphite", "empty query target", model)
emptyQueries = append(emptyQueries, fmt.Sprintf("Query: %v has no target", model))
continue
}
target := fixIntervalFormat(currTarget)

// This is a somewhat inglorious way to ensure we can associate results with the right query
// By using aliasSub, we can get back a resolved series Target name (accounting for other aliases)
// And the original refId. Since there are no restrictions on refId, we need to format it to make it
// easy to find in the response
formattedRefId := strings.ReplaceAll(query.RefID, " ", "_")
origRefIds[formattedRefId] = query.RefID
// This will set the alias to `<resolvedSeriesName> <formattedRefId>`
// e.g. aliasSub(alias(myquery, "foo"), "(^.*$)", "\1 A") will return "foo A"
target = fmt.Sprintf("aliasSub(%s,\"(^.*$)\",\"\\1 %s\")", target, formattedRefId)
targets = append(targets, target)
}

return targets, emptyQueries, origRefIds, nil
}

func (s *Service) parseResponse(logger log.Logger, res *http.Response) ([]TargetResponseDTO, error) {
body, err := io.ReadAll(res.Body)
if err != nil {
Expand All @@ -219,7 +249,7 @@ func (s *Service) parseResponse(logger log.Logger, res *http.Response) ([]Target
return data, nil
}

func (s *Service) toDataFrames(logger log.Logger, response *http.Response) (frames data.Frames, error error) {
func (s *Service) toDataFrames(logger log.Logger, response *http.Response, origRefIds map[string]string) (frames data.Frames, error error) {
responseData, err := s.parseResponse(logger, response)
if err != nil {
return nil, err
Expand All @@ -229,7 +259,18 @@ func (s *Service) toDataFrames(logger log.Logger, response *http.Response) (fram
for _, series := range responseData {
timeVector := make([]time.Time, 0, len(series.DataPoints))
values := make([]*float64, 0, len(series.DataPoints))
name := series.Target
// series.Target will be in the format <resolvedSeriesName> <formattedRefId>
ls := strings.LastIndex(series.Target, " ")
if ls == -1 {
return nil, fmt.Errorf("received graphite response with invalid target format: %s", series.Target)
}
target := series.Target[:ls]
formattedRefId := series.Target[ls+1:]
refId, ok := origRefIds[formattedRefId]
if !ok {
logger.Warn("Unable to find refId associated with provided formattedRefId", "formattedRefId", formattedRefId)
refId = formattedRefId // fallback - shouldn't happen except for in tests
}

for _, dataPoint := range series.DataPoints {
var timestamp, value, err = parseDataTimePoint(dataPoint)
Expand All @@ -250,9 +291,9 @@ func (s *Service) toDataFrames(logger log.Logger, response *http.Response) (fram
}
}

frames = append(frames, data.NewFrame(name,
frames = append(frames, data.NewFrame(refId,
data.NewField("time", nil, timeVector),
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name})))
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: target})))

if setting.Env == setting.Dev {
logger.Debug("Graphite response", "target", series.Target, "datapoints", len(series.DataPoints))
Expand Down

0 comments on commit 0c560b8

Please sign in to comment.