Skip to content

Commit

Permalink
Prometheus: Add Exemplar sampling for streaming parser (grafana#56049) (
Browse files Browse the repository at this point in the history
grafana#56571)

(cherry picked from commit 152c7f1)

Co-authored-by: Todd Treece <360020+toddtreece@users.noreply.github.com>
  • Loading branch information
grafanabot and toddtreece committed Oct 7, 2022
1 parent 94341ed commit d5bb8ab
Show file tree
Hide file tree
Showing 30 changed files with 76,206 additions and 41 deletions.
42 changes: 42 additions & 0 deletions pkg/tsdb/prometheus/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
GO = go
SHELL = /bin/zsh

ITERATIONS=10
BENCH=repeat $(ITERATIONS) $(LEFT_BRACKET) $(GO) test -benchmem -run=^$$ -bench
PROFILE=$(GO) test -benchmem -run=^$$ -benchtime 1x -memprofile memprofile.out -memprofilerate 1 -cpuprofile cpuprofile.out -bench

LEFT_BRACKET = {
RIGHT_BRACKET = }

memprofile-exemplar memprofile-range: %: --%
$(GO) tool pprof -http=localhost:6061 memprofile.out

cpuprofile-exemplar cpuprofile-range: %: --%
$(GO) tool pprof -http=localhost:6061 cpuprofile.out

benchmark-exemplar benchmark-range: %: --%
sed -i 's/buffered/querydata/g' old.txt
benchstat old.txt new.txt
rm old.txt new.txt

--benchmark-range:
$(BENCH) ^BenchmarkRangeJson ./buffered >> old.txt $(RIGHT_BRACKET)
$(BENCH) ^BenchmarkRangeJson ./querydata >> new.txt $(RIGHT_BRACKET)

--memprofile-range:
$(PROFILE) ^BenchmarkRangeJson ./querydata

--cpuprofile-range:
$(PROFILE) ^BenchmarkRangeJson ./querydata

--benchmark-exemplar:
$(BENCH) ^BenchmarkExemplarJson ./buffered >> old.txt $(RIGHT_BRACKET)
$(BENCH) ^BenchmarkExemplarJson ./querydata >> new.txt $(RIGHT_BRACKET)

--memprofile-exemplar:
$(PROFILE) ^BenchmarkExemplarJson ./querydata

--cpuprofile-exemplar:
$(PROFILE) ^BenchmarkExemplarJson ./querydata

.PHONY: benchmark-range benchmark-exemplar memprofile-range memprofile-exemplar cpuprofile-range cpuprofile-exemplar
41 changes: 26 additions & 15 deletions pkg/tsdb/prometheus/buffered/framing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,17 @@ import (

var update = true

func TestMatrixResponses(t *testing.T) {
func TestResponses(t *testing.T) {
tt := []struct {
name string
filepath string
}{
{name: "parse a simple matrix response", filepath: "range_simple"},
{name: "parse a simple matrix response with value missing steps", filepath: "range_missing"},
{name: "parse a response with Infinity", filepath: "range_infinity"},
{name: "parse a response with NaN", filepath: "range_nan"},
{name: "parse a matrix response with Infinity", filepath: "range_infinity"},
{name: "parse a matrix response with NaN", filepath: "range_nan"},
{name: "parse a response with legendFormat __auto", filepath: "range_auto"},
{name: "parse an exemplar response", filepath: "exemplar"},
}

for _, test := range tt {
Expand Down Expand Up @@ -96,13 +97,14 @@ func makeMockedApi(responseBytes []byte) (apiv1.API, error) {
// struct here, because it has `time.time` and `time.duration` fields that
// cannot be unmarshalled from JSON automatically.
type storedPrometheusQuery struct {
RefId string
RangeQuery bool
Start int64
End int64
Step int64
Expr string
LegendFormat string
RefId string
RangeQuery bool
ExemplarQuery bool
Start int64
End int64
Step int64
Expr string
LegendFormat string
}

func loadStoredPrometheusQuery(fileName string) (storedPrometheusQuery, error) {
Expand All @@ -126,11 +128,12 @@ func runQuery(response []byte, sq storedPrometheusQuery) (*backend.QueryDataResp
tracer := tracing.InitializeTracerForTest()

qm := QueryModel{
RangeQuery: sq.RangeQuery,
Expr: sq.Expr,
Interval: fmt.Sprintf("%ds", sq.Step),
IntervalMS: sq.Step * 1000,
LegendFormat: sq.LegendFormat,
RangeQuery: sq.RangeQuery,
ExemplarQuery: sq.ExemplarQuery,
Expr: sq.Expr,
Interval: fmt.Sprintf("%ds", sq.Step),
IntervalMS: sq.Step * 1000,
LegendFormat: sq.LegendFormat,
}

b := Buffered{
Expand Down Expand Up @@ -165,6 +168,14 @@ func runQuery(response []byte, sq storedPrometheusQuery) (*backend.QueryDataResp
return nil, err
}

// parseTimeSeriesQuery forces range queries if the only query is an exemplar query
// so we need to set it back to false
if qm.ExemplarQuery {
for i := range queries {
queries[i].RangeQuery = false
}
}

return b.runQueries(context.Background(), queries)
}

Expand Down
60 changes: 58 additions & 2 deletions pkg/tsdb/prometheus/buffered/prometeus_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package buffered

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"os"
"path/filepath"
"strings"
"testing"
"time"
Expand All @@ -13,9 +16,37 @@ import (
)

// when memory-profiling this benchmark, these commands are recommended:
// - go test -benchmem -run=^$ -benchtime 1x -memprofile memprofile.out -memprofilerate 1 -bench ^BenchmarkJson$ github.com/grafana/grafana/pkg/tsdb/prometheus
// - go test -benchmem -run=^$ -benchtime 1x -memprofile memprofile.out -memprofilerate 1 -bench ^BenchmarkExemplarJson$ github.com/grafana/grafana/pkg/tsdb/prometheus/buffered
// - go tool pprof -http=localhost:6061 memprofile.out
func BenchmarkJson(b *testing.B) {
func BenchmarkExemplarJson(b *testing.B) {
queryFileName := filepath.Join("../testdata", "exemplar.query.json")
query, err := loadStoredQuery(queryFileName)
require.NoError(b, err)

responseFileName := filepath.Join("../testdata", "exemplar.result.json")
// This is a test, so it's safe to ignore gosec warning G304.
// nolint:gosec
responseBytes, err := os.ReadFile(responseFileName)
require.NoError(b, err)

api, err := makeMockedApi(responseBytes)
require.NoError(b, err)

tracer := tracing.InitializeTracerForTest()

s := Buffered{tracer: tracer, log: &fakeLogger{}, client: api}

b.ResetTimer()
for n := 0; n < b.N; n++ {
_, err := s.runQueries(context.Background(), []*PrometheusQuery{query})
require.NoError(b, err)
}
}

// when memory-profiling this benchmark, these commands are recommended:
// - go test -benchmem -run=^$ -benchtime 1x -memprofile memprofile.out -memprofilerate 1 -bench ^BenchmarkRangeJson$ github.com/grafana/grafana/pkg/tsdb/prometheus/buffered
// - go tool pprof -http=localhost:6061 memprofile.out
func BenchmarkRangeJson(b *testing.B) {
resp, query := createJsonTestData(1642000000, 1, 300, 400)

api, err := makeMockedApi(resp)
Expand Down Expand Up @@ -82,3 +113,28 @@ func createJsonTestData(start int64, step int64, timestampCount int, seriesCount

return bytes, query
}

func loadStoredQuery(fileName string) (*PrometheusQuery, error) {
// This is a test, so it's safe to ignore gosec warning G304.
// nolint:gosec
bytes, err := os.ReadFile(fileName)
if err != nil {
return nil, err
}

var sq storedPrometheusQuery

err = json.Unmarshal(bytes, &sq)
if err != nil {
return nil, err
}

return &PrometheusQuery{
RefId: "A",
ExemplarQuery: sq.ExemplarQuery,
Start: time.Unix(sq.Start, 0),
End: time.Unix(sq.End, 0),
Step: time.Second * time.Duration(sq.Step),
Expr: sq.Expr,
}, nil
}
4 changes: 4 additions & 0 deletions pkg/tsdb/prometheus/buffered/time_series_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ func exemplarToDataFrames(response []apiv1.ExemplarQueryResult, query *Prometheu
}
}

sort.SliceStable(sampleExemplars, func(i, j int) bool {
return sampleExemplars[i].Time.Before(sampleExemplars[j].Time)
})

// Create DF from sampled exemplars
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(sampleExemplars))
timeField.Name = "Time"
Expand Down
10 changes: 6 additions & 4 deletions pkg/tsdb/prometheus/models/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,8 @@ func (query *Query) TimeRange() TimeRange {
return TimeRange{
Step: query.Step,
// Align query range to step. It rounds start and end down to a multiple of step.
Start: alignTimeRange(query.Start, query.Step, query.UtcOffsetSec),
End: alignTimeRange(query.End, query.Step, query.UtcOffsetSec),
Start: AlignTimeRange(query.Start, query.Step, query.UtcOffsetSec),
End: AlignTimeRange(query.End, query.Step, query.UtcOffsetSec),
}
}

Expand Down Expand Up @@ -225,6 +225,8 @@ func isVariableInterval(interval string) bool {
return false
}

func alignTimeRange(t time.Time, step time.Duration, offset int64) time.Time {
return time.Unix(int64(math.Floor((float64(t.Unix()+offset)/step.Seconds()))*step.Seconds()-float64(offset)), 0)
func AlignTimeRange(t time.Time, step time.Duration, offset int64) time.Time {
offsetNano := float64(offset * 1e9)
stepNano := float64(step.Nanoseconds())
return time.Unix(0, int64(math.Floor((float64(t.UnixNano())+offsetNano)/stepNano)*stepNano-offsetNano)).UTC()
}
27 changes: 26 additions & 1 deletion pkg/tsdb/prometheus/models/query_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models_test

import (
"reflect"
"testing"
"time"

Expand All @@ -15,7 +16,7 @@ var (
intervalCalculator = intervalv2.NewCalculator()
)

func TestPrometheus_timeSeriesQuery_parseTimeSeriesQuery(t *testing.T) {
func TestParse(t *testing.T) {
t.Run("parsing query from unified alerting", func(t *testing.T) {
timeRange := backend.TimeRange{
From: now,
Expand Down Expand Up @@ -449,3 +450,27 @@ func queryContext(json string, timeRange backend.TimeRange) backend.DataQuery {
RefID: "A",
}
}

func TestAlignTimeRange(t *testing.T) {
type args struct {
t time.Time
step time.Duration
offset int64
}
tests := []struct {
name string
args args
want time.Time
}{
{name: "second step", args: args{t: time.Unix(1664816826, 0), step: 10 * time.Second, offset: 0}, want: time.Unix(1664816820, 0).UTC()},
{name: "millisecond step", args: args{t: time.Unix(1664816825, 5*int64(time.Millisecond)), step: 10 * time.Millisecond, offset: 0}, want: time.Unix(1664816825, 0).UTC()},
{name: "second step with offset", args: args{t: time.Unix(1664816825, 5*int64(time.Millisecond)), step: 2 * time.Second, offset: -3}, want: time.Unix(1664816825, 0).UTC()},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := models.AlignTimeRange(tt.args.t, tt.args.step, tt.args.offset); !reflect.DeepEqual(got, tt.want) {
t.Errorf("AlignTimeRange() = %v, want %v", got, tt.want)
}
})
}
}
42 changes: 42 additions & 0 deletions pkg/tsdb/prometheus/models/result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package models

import "github.com/grafana/grafana-plugin-sdk-go/data"

type ResultType string

const (
ResultTypeMatrix ResultType = "matrix"
ResultTypeExemplar ResultType = "exemplar"
ResultTypeVector ResultType = "vector"
ResultTypeUnknown ResultType = ""
)

func ResultTypeFromFrame(frame *data.Frame) ResultType {
if frame.Meta.Custom == nil {
return ResultTypeUnknown
}
custom, ok := frame.Meta.Custom.(map[string]string)
if !ok {
return ResultTypeUnknown
}

rt, ok := custom["resultType"]
if !ok {
return ResultTypeUnknown
}

switch rt {
case ResultTypeMatrix.String():
return ResultTypeMatrix
case ResultTypeExemplar.String():
return ResultTypeExemplar
case ResultTypeVector.String():
return ResultTypeVector
}

return ResultTypeUnknown
}

func (r ResultType) String() string {
return string(r)
}

0 comments on commit d5bb8ab

Please sign in to comment.