Skip to content

Commit

Permalink
Implement GRPC query API
Browse files Browse the repository at this point in the history
With the current GRPC APIs, layering Thanos Queriers results in
the root querier getting all of the samples and executing the query
in memory. As a result, the intermediary Queriers do not do any
intensive work and merely transport samples from the Stores to the
root Querier.

When data is perfectly sharded, users can implement a pattern where
the root Querier instructs the intermediary ones to execute the queries
from their stores and return back results. The results can then be
concatenated by the root querier and returned to the user.

In order to support this use case, this commit implements a GRPC API
in the Querier which is analogous to the HTTP Query API exposed
by Prometheus.

Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Mar 22, 2022
1 parent 149e026 commit 48fff63
Show file tree
Hide file tree
Showing 12 changed files with 2,308 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#5220](https://github.com/thanos-io/thanos/pull/5220) Query Frontend: Add `--query-frontend.forward-header` flag, forward headers to downstream querier.
- [#5250](https://github.com/thanos-io/thanos/pull/5250/files) Querier: Expose Query and QueryRange APIs through GRPC

### Changed

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Expand Up @@ -25,6 +25,9 @@ else ifeq ($(arch), armv8)
else ifeq ($(arch), arm64)
# arm64
BASE_DOCKER_SHA=${arm64}
else ifeq ($(arch), aarch64)
# arm64
BASE_DOCKER_SHA=${arm64}
else
echo >&2 "only support amd64 or arm64 arch" && exit 1
endif
Expand Down
9 changes: 6 additions & 3 deletions cmd/thanos/query.go
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"

v1 "github.com/thanos-io/thanos/pkg/api/query"
apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
Expand Down Expand Up @@ -574,6 +574,7 @@ func runQuery(
grpcProbe,
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)
engineCreator := engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta)

// Start query API + UI HTTP server.
{
Expand All @@ -600,10 +601,10 @@ func runQuery(
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL).Register(router, ins)

api := v1.NewQueryAPI(
api := apiv1.NewQueryAPI(
logger,
endpoints.GetEndpointStatus,
engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta),
engineCreator,
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
Expand Down Expand Up @@ -673,7 +674,9 @@ func runQuery(
info.WithTargetsInfoFunc(),
)

grpcAPI := apiv1.NewGrpcAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -96,6 +96,7 @@ require (
gopkg.in/fsnotify.v1 v1.4.7
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
gotest.tools v2.2.0+incompatible
)

require (
Expand Down
1 change: 1 addition & 0 deletions go.sum
Expand Up @@ -2536,6 +2536,7 @@ gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
Expand Down
161 changes: 161 additions & 0 deletions pkg/api/query/grpc.go
@@ -0,0 +1,161 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package v1

import (
"context"
"time"

"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"google.golang.org/grpc"
)

type GrpcAPI struct {
now func() time.Time
queryableCreate query.QueryableCreator
queryEngine func(int64) *promql.Engine
defaultMaxResolutionSeconds time.Duration
}

func NewGrpcAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GrpcAPI {
return &GrpcAPI{
now: now,
queryableCreate: creator,
queryEngine: queryEngine,
defaultMaxResolutionSeconds: defaultMaxResolutionSeconds,
}
}

func RegisterQueryServer(queryServer querypb.QueryServer) func(*grpc.Server) {
return func(s *grpc.Server) {
querypb.RegisterQueryServer(s, queryServer)
}
}

func (grpcAPI *GrpcAPI) Query(ctx context.Context, request *querypb.QueryRequest) (*querypb.QueryResponse, error) {
var ts time.Time
if request.TimeSeconds == 0 {
ts = grpcAPI.now()
} else {
ts = time.Unix(request.TimeSeconds, 0)
}

if request.TimeoutSeconds != 0 {
var cancel context.CancelFunc
timeout := time.Duration(request.TimeoutSeconds) * time.Second
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

maxResolution := request.MaxResolutionSeconds
if request.MaxResolutionSeconds == 0 {
maxResolution = grpcAPI.defaultMaxResolutionSeconds.Milliseconds() / 1000
}

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return nil, err
}

qe := grpcAPI.queryEngine(request.MaxResolutionSeconds)
queryable := grpcAPI.queryableCreate(
request.EnableDedup,
request.ReplicaLabels,
storeMatchers,
maxResolution,
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
)
qry, err := qe.NewInstantQuery(queryable, request.Query, ts)
if err != nil {
return nil, err
}

result := qry.Exec(ctx)
switch vector := result.Value.(type) {
case promql.Scalar:
return &querypb.QueryResponse{
Timeseries: []prompb.TimeSeries{{
Samples: []prompb.Sample{{Value: vector.V, Timestamp: vector.T}},
}},
}, nil
case promql.Vector:
response := &querypb.QueryResponse{
Timeseries: make([]prompb.TimeSeries, 0, len(vector)),
}

for _, sample := range vector {
response.Timeseries = append(response.Timeseries, prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}),
})
}

return response, nil
}

return nil, nil
}

func (grpcAPI *GrpcAPI) QueryRange(ctx context.Context, request *querypb.QueryRangeRequest) (*querypb.QueryRangeResponse, error) {
if request.TimeoutSeconds != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(request.TimeoutSeconds))
defer cancel()
}

maxResolution := request.MaxResolutionSeconds
if request.MaxResolutionSeconds == 0 {
maxResolution = grpcAPI.defaultMaxResolutionSeconds.Milliseconds() / 1000
}

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return nil, err
}

qe := grpcAPI.queryEngine(request.MaxResolutionSeconds)
queryable := grpcAPI.queryableCreate(
request.EnableDedup,
request.ReplicaLabels,
storeMatchers,
maxResolution,
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
)

startTime := time.Unix(request.StartTimeSeconds, 0)
endTime := time.Unix(request.EndTimeSeconds, 0)
interval := time.Duration(request.IntervalSeconds) * time.Second

qry, err := qe.NewRangeQuery(queryable, request.Query, startTime, endTime, interval)
if err != nil {
return nil, err
}

result := qry.Exec(ctx)
switch matrix := result.Value.(type) {
case promql.Matrix:
response := &querypb.QueryRangeResponse{
Timeseries: make([]prompb.TimeSeries, len(matrix)),
}

for i, series := range matrix {
response.Timeseries[i] = prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: prompb.SamplesFromPromqlPoints(series.Points),
}
}

return response, nil
}

return nil, nil
}

0 comments on commit 48fff63

Please sign in to comment.