Skip to content

Commit

Permalink
*: Promql changes to add support to extended functions throught Thanos (
Browse files Browse the repository at this point in the history
#7338)

* fixing extended functions support in more places

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Adding new failint for the Parse() method

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Adding new method for ParseMetricSelector

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Fixing missing imports

Extending test to check behavior

More missing imports

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Fixing method name

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Solving references to forbidden functions

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>

* Treating promql validation from ParseExpr

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

* fixing funcs

Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>

---------

Signed-off-by: Pedro Tanaka <pedro.stanaka@gmail.com>
Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>
  • Loading branch information
pedro-stanaka committed May 7, 2024
1 parent ab8f2b3 commit 970cbbe
Show file tree
Hide file tree
Showing 21 changed files with 171 additions and 36 deletions.
1 change: 1 addition & 0 deletions Makefile
Expand Up @@ -404,6 +404,7 @@ NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec}=github.com/prometheus/cl
NewCounterVec,NewCounterVec,NewGauge,NewGaugeVec,NewGaugeFunc,NewHistorgram,NewHistogramVec,NewSummary,NewSummaryVec},\
github.com/NYTimes/gziphandler.{GzipHandler}=github.com/klauspost/compress/gzhttp.{GzipHandler},\
sync/atomic=go.uber.org/atomic,github.com/cortexproject/cortex=github.com/thanos-io/thanos/internal/cortex,\
github.com/prometheus/prometheus/promql/parser.{ParseExpr,ParseMetricSelector}=github.com/thanos-io/thanos/pkg/extpromql.{ParseExpr,ParseMetricSelector},\
io/ioutil.{Discard,NopCloser,ReadAll,ReadDir,ReadFile,TempDir,TempFile,Writefile}" $(shell go list ./... | grep -v "internal/cortex")
@$(FAILLINT) -paths "fmt.{Print,Println,Sprint}" -ignore-tests ./...
@echo ">> linting all of the Go files GOGC=${GOGC}"
Expand Down
3 changes: 2 additions & 1 deletion cmd/thanos/rule.go
Expand Up @@ -59,6 +59,7 @@ import (
"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/info"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/logging"
Expand Down Expand Up @@ -951,7 +952,7 @@ func queryFuncCreator(
queryAPIClients := grpcEndpointSet.GetQueryAPIClients()
for _, i := range rand.Perm(len(queryAPIClients)) {
e := query.NewRemoteEngine(logger, queryAPIClients[i], query.Opts{})
expr, err := parser.ParseExpr(qs)
expr, err := extpromql.ParseExpr(qs)
if err != nil {
level.Error(logger).Log("err", err, "query", qs)
continue
Expand Down
5 changes: 3 additions & 2 deletions internal/cortex/querier/queryrange/results_cache.go
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"flag"
"fmt"
"github.com/thanos-io/thanos/pkg/extpromql"
"net/http"
"sort"
"strings"
Expand Down Expand Up @@ -325,7 +326,7 @@ func (s resultsCache) isAtModifierCachable(r Request, maxCacheTime int64) bool {
if !strings.Contains(query, "@") {
return true
}
expr, err := parser.ParseExpr(query)
expr, err := extpromql.ParseExpr(query)
if err != nil {
// We are being pessimistic in such cases.
level.Warn(s.logger).Log("msg", "failed to parse query, considering @ modifier as not cachable", "query", query, "err", err)
Expand Down Expand Up @@ -370,7 +371,7 @@ func (s resultsCache) isOffsetCachable(r Request) bool {
if !strings.Contains(query, "offset") {
return true
}
expr, err := parser.ParseExpr(query)
expr, err := extpromql.ParseExpr(query)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to parse query, considering offset as not cachable", "query", query, "err", err)
return false
Expand Down
3 changes: 2 additions & 1 deletion internal/cortex/querier/queryrange/split_by_interval.go
Expand Up @@ -5,6 +5,7 @@ package queryrange

import (
"context"
"github.com/thanos-io/thanos/pkg/extpromql"
"net/http"
"time"

Expand Down Expand Up @@ -97,7 +98,7 @@ func splitQuery(r Request, interval time.Duration) ([]Request, error) {
// For example given the start of the query is 10.00, `http_requests_total[1h] @ start()` query will be replaced with `http_requests_total[1h] @ 10.00`
// If the modifier is already a constant, it will be returned as is.
func EvaluateAtModifierFunction(query string, start, end int64) (string, error) {
expr, err := parser.ParseExpr(query)
expr, err := extpromql.ParseExpr(query)
if err != nil {
return "", httpgrpc.Errorf(http.StatusBadRequest, `{"status": "error", "error": "%s"}`, err)
}
Expand Down
9 changes: 7 additions & 2 deletions internal/cortex/querier/queryrange/split_by_interval_test.go
Expand Up @@ -5,6 +5,7 @@ package queryrange

import (
"context"
"github.com/thanos-io/thanos/pkg/extpromql"
io "io"
"net/http"
"net/http/httptest"
Expand All @@ -13,7 +14,6 @@ import (
"testing"
"time"

"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/middleware"
Expand Down Expand Up @@ -332,6 +332,11 @@ func Test_evaluateAtModifier(t *testing.T) {
in: "topk(5, rate(http_requests_total[1h] @ start()))",
expected: "topk(5, rate(http_requests_total[1h] @ 1546300.800))",
},
{
// extended functions
in: "topk(5, xrate(http_requests_total[1h] @ start()))",
expected: "topk(5, xrate(http_requests_total[1h] @ 1546300.800))",
},
{
in: "topk(5, rate(http_requests_total[1h] @ 0))",
expected: "topk(5, rate(http_requests_total[1h] @ 0.000))",
Expand Down Expand Up @@ -390,7 +395,7 @@ func Test_evaluateAtModifier(t *testing.T) {
require.Equal(t, tt.expectedErrorCode, int(httpResp.Code))
} else {
require.NoError(t, err)
expectedExpr, err := parser.ParseExpr(tt.expected)
expectedExpr, err := extpromql.ParseExpr(tt.expected)
require.NoError(t, err)
require.Equal(t, expectedExpr.String(), out)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/query/grpc_test.go
Expand Up @@ -13,14 +13,14 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/logicalplan"
equery "github.com/thanos-io/promql-engine/query"

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/store"
)
Expand All @@ -36,7 +36,7 @@ func TestGRPCQueryAPIWithQueryPlan(t *testing.T) {
}
api := NewGRPCAPI(time.Now, nil, queryableCreator, engineFactory, querypb.EngineType_thanos, lookbackDeltaFunc, 0)

expr, err := parser.ParseExpr("metric")
expr, err := extpromql.ParseExpr("metric")
testutil.Ok(t, err)
lplan := logicalplan.NewFromAST(expr, &equery.Options{}, logicalplan.PlanOptions{})
testutil.Ok(t, err)
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/query/v1.go
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/thanos-io/thanos/pkg/exemplars"
"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/metadata"
Expand Down Expand Up @@ -374,7 +375,7 @@ func (qapi *QueryAPI) parseStoreDebugMatchersParam(r *http.Request) (storeMatche
}

for _, s := range r.Form[StoreMatcherParam] {
matchers, err := parser.ParseMetricSelector(s)
matchers, err := extpromql.ParseMetricSelector(s)
if err != nil {
return nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/metadata/meta.go
Expand Up @@ -20,12 +20,12 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/tombstones"
"gopkg.in/yaml.v3"

"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/runutil"
)

Expand Down Expand Up @@ -136,7 +136,7 @@ type Rewrite struct {
type Matchers []*labels.Matcher

func (m *Matchers) UnmarshalYAML(value *yaml.Node) (err error) {
*m, err = parser.ParseMetricSelector(value.Value)
*m, err = extpromql.ParseMetricSelector(value.Value)
if err != nil {
return errors.Wrapf(err, "parse metric selector %v", value.Value)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/exemplars/multitsdb.go
Expand Up @@ -6,6 +6,7 @@ package exemplars
import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/extpromql"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -26,7 +27,7 @@ func NewMultiTSDB(tsdbExemplarsServers func() map[string]*TSDB) *MultiTSDB {

// Exemplars returns all specified exemplars from a MultiTSDB instance.
func (m *MultiTSDB) Exemplars(r *exemplarspb.ExemplarsRequest, s exemplarspb.Exemplars_ExemplarsServer) error {
expr, err := parser.ParseExpr(r.Query)
expr, err := extpromql.ParseExpr(r.Query)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/exemplars/proxy.go
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tracing"
Expand Down Expand Up @@ -59,7 +60,7 @@ func (s *Proxy) Exemplars(req *exemplarspb.ExemplarsRequest, srv exemplarspb.Exe
span, ctx := tracing.StartSpan(srv.Context(), "proxy_exemplars")
defer span.Finish()

expr, err := parser.ParseExpr(req.Query)
expr, err := extpromql.ParseExpr(req.Query)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/exemplars/proxy_test.go
Expand Up @@ -12,6 +12,7 @@ import (
"sync"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
Expand All @@ -21,9 +22,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/thanos/pkg/exemplars/exemplarspb"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb"
)
Expand Down Expand Up @@ -54,7 +54,7 @@ func (t *testExemplarClient) Recv() (*exemplarspb.ExemplarsResponse, error) {
}

func (t *testExemplarClient) Exemplars(ctx context.Context, in *exemplarspb.ExemplarsRequest, opts ...grpc.CallOption) (exemplarspb.Exemplars_ExemplarsClient, error) {
expr, err := parser.ParseExpr(in.Query)
expr, err := extpromql.ParseExpr(in.Query)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
Expand Down
55 changes: 53 additions & 2 deletions pkg/extpromql/parser.go
Expand Up @@ -4,12 +4,63 @@
package extpromql

import (
"fmt"
"strings"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"

"github.com/thanos-io/promql-engine/execution/function"
)

func ParserExpr(input string) (parser.Expr, error) {
p := parser.NewParser(input, parser.WithFunctions(function.XFunctions))
// ParseExpr parses the input PromQL expression and returns the parsed representation.
func ParseExpr(input string) (parser.Expr, error) {
allFuncs := make(map[string]*parser.Function, len(function.XFunctions)+len(parser.Functions))
for k, v := range parser.Functions {
allFuncs[k] = v
}
for k, v := range function.XFunctions {
allFuncs[k] = v
}
p := parser.NewParser(input, parser.WithFunctions(allFuncs))
defer p.Close()
return p.ParseExpr()
}

// ParseMetricSelector parses the provided textual metric selector into a list of
// label matchers.
func ParseMetricSelector(input string) ([]*labels.Matcher, error) {
expr, err := ParseExpr(input)
// because of the AST checking present in the ParseExpr function,
// we need to ignore the error if it is just the check for empty name matcher.
if err != nil && !isEmptyNameMatcherErr(err) {
return nil, err
}

vs, ok := expr.(*parser.VectorSelector)
if !ok {
return nil, fmt.Errorf("expected type *parser.VectorSelector, got %T", expr)
}

matchers := make([]*labels.Matcher, len(vs.LabelMatchers))
for i, lm := range vs.LabelMatchers {
matchers[i] = &labels.Matcher{
Type: lm.Type,
Name: lm.Name,
Value: lm.Value,
}
}

return matchers, nil
}

func isEmptyNameMatcherErr(err error) bool {
var parseErrs parser.ParseErrors
if errors.As(err, &parseErrs) {
return len(parseErrs) == 1 &&
strings.HasSuffix(parseErrs[0].Error(), "vector selector must contain at least one non-empty matcher")
}

return false
}
68 changes: 68 additions & 0 deletions pkg/extpromql/parser_test.go
@@ -0,0 +1,68 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package extpromql_test

import (
"fmt"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"

"github.com/thanos-io/thanos/pkg/extpromql"
)

func TestParseMetricSelector(t *testing.T) {
testCases := []struct {
name string
input string
}{
{
name: "single selector",
input: `http_requests_total{method="GET"}`,
},
{
name: "empty selectors",
input: `process_cpu_seconds_total`,
},
{
name: "multiple selectors",
input: `http_requests_total{method="GET",code="200"}`,
},
{
name: "multiple selectors with different matchers",
input: `http_requests_total{method="GET",code!="200"}`,
},
{
name: "multiple selectors with regex",
input: `http_requests_total{method="GET",code=~"2.*"}`,
},
{
name: "selector with negative regex",
input: `{code!~"2.*"}`,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
//lint:ignore faillint Testing against prometheus parser.
want, err := parser.ParseMetricSelector(tc.input)
if err != nil {
t.Fatalf("Prometheus ParseMetricSelector failed: %v", err)
}

got, err := extpromql.ParseMetricSelector(tc.input)
if err != nil {
t.Fatalf("ParseMetricSelector failed: %v", err)
}

testutil.Equals(t, stringFmt(want), stringFmt(got))
})
}
}

func stringFmt(got []*labels.Matcher) string {
return fmt.Sprintf("%v", got)
}
4 changes: 2 additions & 2 deletions pkg/query/remote_engine_test.go
Expand Up @@ -14,12 +14,12 @@ import (
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/thanos-io/promql-engine/query"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/extpromql"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/store/labelpb"
)
Expand All @@ -34,7 +34,7 @@ func TestRemoteEngine_Warnings(t *testing.T) {
end = time.Unix(120, 0)
step = 30 * time.Second
)
qryExpr, err := parser.ParseExpr("up")
qryExpr, err := extpromql.ParseExpr("up")
testutil.Ok(t, err)

plan := logicalplan.NewFromAST(qryExpr, &query.Options{
Expand Down

0 comments on commit 970cbbe

Please sign in to comment.