Skip to content

Commit

Permalink
Implement Query API discovery
Browse files Browse the repository at this point in the history
A recent commit (thanos-io#5250) added a GRPC API to Thanos Query which allows
executing PromQL over GRPC. This API is currently not discoverable
through endpointsets which makes it hard for other Thanos components
to use this it.

This commit extends endpointsets with a GetQueryAPIClients method
which returns Query API clients to all components which support
this API.
  • Loading branch information
fpetkovski committed Apr 21, 2022
1 parent 4b3f555 commit ac76232
Show file tree
Hide file tree
Showing 8 changed files with 321 additions and 46 deletions.
3 changes: 2 additions & 1 deletion cmd/thanos/query.go
Expand Up @@ -675,9 +675,10 @@ func runQuery(
info.WithRulesInfoFunc(),
info.WithMetricMetadataInfoFunc(),
info.WithTargetsInfoFunc(),
info.WithQueryInfoFunc(),
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
grpcAPI := apiv1.NewGRPCAPI(logger, time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
Expand Down
21 changes: 18 additions & 3 deletions pkg/api/query/grpc.go
Expand Up @@ -7,6 +7,11 @@ import (
"context"
"time"

"github.com/go-kit/log"
"github.com/thanos-io/thanos/pkg/component"

"github.com/thanos-io/thanos/pkg/info/infopb"

"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/query"
Expand All @@ -16,14 +21,23 @@ import (
)

type GRPCAPI struct {
logger log.Logger
now func() time.Time
queryableCreate query.QueryableCreator
queryEngine func(int64) *promql.Engine
defaultMaxResolutionSeconds time.Duration
}

func NewGRPCAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI {
func (g *GRPCAPI) Info(ctx context.Context, request *infopb.InfoRequest) (*infopb.InfoResponse, error) {
return &infopb.InfoResponse{
ComponentType: component.QueryAPI.String(),
Query: &infopb.QueryInfo{},
}, nil
}

func NewGRPCAPI(logger log.Logger, now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI {
return &GRPCAPI{
logger: logger,
now: now,
queryableCreate: creator,
queryEngine: queryEngine,
Expand Down Expand Up @@ -64,14 +78,15 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer
}

qe := g.queryEngine(request.MaxResolutionSeconds)

queryable := g.queryableCreate(
request.EnableDedup,
request.ReplicaLabels,
storeMatchers,
maxResolution,
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
request.SkipChunks,
)
qry, err := qe.NewInstantQuery(queryable, request.Query, ts)
if err != nil {
Expand Down Expand Up @@ -134,7 +149,7 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que
maxResolution,
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
request.SkipChunks,
)

startTime := time.Unix(request.StartTimeSeconds, 0)
Expand Down
3 changes: 3 additions & 0 deletions pkg/component/component.go
Expand Up @@ -102,6 +102,8 @@ func FromString(storeType string) StoreAPI {
return Receive
case "debug":
return Debug
case "queryAPI":
return QueryAPI
default:
return UnknownStoreAPI
}
Expand All @@ -124,4 +126,5 @@ var (
Store = storeAPI{component: component{name: "store"}}
UnknownStoreAPI = storeAPI{component: component{name: "unknown-store-api"}}
Query = storeAPI{component: component{name: "query"}}
QueryAPI = storeAPI{component: component{name: "queryAPI"}}
)
18 changes: 18 additions & 0 deletions pkg/info/info.go
Expand Up @@ -25,6 +25,7 @@ type InfoServer struct {
getRulesInfo func() *infopb.RulesInfo
getTargetsInfo func() *infopb.TargetsInfo
getMetricMetadataInfo func() *infopb.MetricMetadataInfo
getQueryInfo func() *infopb.QueryInfo
}

// NewInfoServer creates a new server instance for given component
Expand All @@ -42,6 +43,7 @@ func NewInfoServer(
getRulesInfo: func() *infopb.RulesInfo { return nil },
getTargetsInfo: func() *infopb.TargetsInfo { return nil },
getMetricMetadataInfo: func() *infopb.MetricMetadataInfo { return nil },
getQueryInfo: func() *infopb.QueryInfo { return nil },
}

for _, o := range options {
Expand Down Expand Up @@ -144,6 +146,21 @@ func WithMetricMetadataInfoFunc(getMetricMetadataInfo ...func() *infopb.MetricMe
}
}

// WithQueryInfoFunc determines the function that should be executed to obtain
// the query information. If no function is provided, the default empty
// query info is returned. Only the first function from the list is considered.
func WithQueryInfoFunc(queryInfo ...func() *infopb.QueryInfo) ServerOptionFunc {
if len(queryInfo) == 0 {
return func(s *InfoServer) {
s.getQueryInfo = func() *infopb.QueryInfo { return &infopb.QueryInfo{} }
}
}

return func(s *InfoServer) {
s.getQueryInfo = queryInfo[0]
}
}

// RegisterInfoServer registers the info server.
func RegisterInfoServer(infoSrv infopb.InfoServer) func(*grpc.Server) {
return func(s *grpc.Server) {
Expand All @@ -161,5 +178,6 @@ func (srv *InfoServer) Info(ctx context.Context, req *infopb.InfoRequest) (*info
Rules: srv.getRulesInfo(),
Targets: srv.getTargetsInfo(),
MetricMetadata: srv.getMetricMetadataInfo(),
Query: srv.getQueryInfo(),
}, nil
}

0 comments on commit ac76232

Please sign in to comment.