Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GRPC query API #5250

Merged
merged 1 commit into from Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Added

- [#5220](https://github.com/thanos-io/thanos/pull/5220) Query Frontend: Add `--query-frontend.forward-header` flag, forward headers to downstream querier.
- [#5250](https://github.com/thanos-io/thanos/pull/5250/files) Querier: Expose Query and QueryRange APIs through GRPC.

### Changed

Expand Down
3 changes: 3 additions & 0 deletions Makefile
Expand Up @@ -25,6 +25,9 @@ else ifeq ($(arch), armv8)
else ifeq ($(arch), arm64)
# arm64
BASE_DOCKER_SHA=${arm64}
else ifeq ($(arch), aarch64)
bwplotka marked this conversation as resolved.
Show resolved Hide resolved
# arm64
BASE_DOCKER_SHA=${arm64}
else
echo >&2 "only support amd64 or arm64 arch" && exit 1
endif
Expand Down
9 changes: 6 additions & 3 deletions cmd/thanos/query.go
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"

v1 "github.com/thanos-io/thanos/pkg/api/query"
apiv1 "github.com/thanos-io/thanos/pkg/api/query"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
Expand Down Expand Up @@ -574,6 +574,7 @@ func runQuery(
grpcProbe,
prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)),
)
engineCreator := engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta)

// Start query API + UI HTTP server.
{
Expand All @@ -600,10 +601,10 @@ func runQuery(
// TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting.
ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL).Register(router, ins)

api := v1.NewQueryAPI(
api := apiv1.NewQueryAPI(
logger,
endpoints.GetEndpointStatus,
engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta),
engineCreator,
queryableCreator,
// NOTE: Will share the same replica label as the query for now.
rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels),
Expand Down Expand Up @@ -676,7 +677,9 @@ func runQuery(
info.WithTargetsInfoFunc(),
)

grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution)
s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe,
grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)),
grpcserver.WithServer(store.RegisterStoreServer(proxy)),
grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)),
grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)),
Expand Down
122 changes: 122 additions & 0 deletions docs/proposals-done/202203-grpc-query-api.md
@@ -0,0 +1,122 @@
## Introduce a Query gRPC API

* **Owners:**
* `@fpetkovski`
* `@mzardab`

> TL;DR: Introducing a new gRPC API for `/query` and `/query_range`

## Why

We want to be able to distinguish between gRPC Store APIs and other Queriers in the query path. Currently, Thanos Query implements the gRPC Store API and the root Querier does not distinguish between Store targets and other Queriers that are capable of processing a PromQL expression before returning the result. The new gRPC Query API will allow a querier to fan out query execution, in addition to Store API selects.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to be able to distinguish between gRPC Store APIs and other Queriers in the query path.

I don't get that - you can distinguish it easily with configuration, no need for a new API for this particular point?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but we can't distinguish between them via gRPC, only via HTTP.


This is useful for a few reasons:

* When Queriers register disjoint Store targets, they should be able to deduplicate series and then execute the query without concerns of duplicate data from other queriers. This new API would enable users to effectively partition by Querier, and avoid shipping raw series back from each disjointed Querier to the root Querier.
* If Queriers register Store targets with overlapping series, users would be able to express a query sharding strategy between Queriers to more effectively distribute query load amongst a fleet of homogenous Queriers.
* The proposed Query API utilizes gRPC instead of HTTP, which would enable gRPC streaming from root Querier all the way to the underlying Store targets (Query API -> Store API) and unlock the performance benefits of gRPC over HTTP.
* When there is only one StoreAPI connected to Thanos Query which completely covers the requested range of the original user's query, then it is more optimal to execute the query directly in the store, instead of sending raw samples to the querier. This scenario is not unlikely given query-frontend's sharding capabilities.

### Pitfalls of the current solution

Thanos Query currently allows for `query` and `query_range` operations through HTTP only. Various query strategies can be implemented using the HTTP API, an analogous gRPC API would allow for a more resource efficient and expressive query execution path. The two main reasons are the streaming capabilities that come out of the box with gRPC, statically typed API spec, as well as the lower bandwidth utilization which protobuf enables.

## Goals
* Introduce a gRPC Query API implementation equivalent to the current Querier HTTP API (`query` for instant queries, `query_range` for range queries)

## Non-Goals

* Implementation of potential query sharding strategies described in this proposal.
* Streaming implementations for `query` and `query_range` rpc's, these will be introduced as additional `QueryStream` and `QueryRangeStream` rpc's subsequently.
* Response series ordering equivalent to the current Prometheus Query HTTP API behaviour

### Audience
* Thanos Maintainers
* Thanos Users

## How

We propose defining the following gRPC API:

```protobuf
service Query {
rpc Query(QueryRequest) returns (stream QueryResponse);

rpc QueryRange(QueryRangeRequest) returns (stream QueryRangeResponse);
}
```

Where the `QueryRequest`, `QueryResponse`, `QueryRangeRequest` and `Query RangeResponse` are defined as follows:

```protobuf
message QueryRequest {
string query = 1;

int64 time_seconds = 2;
int64 timeout_seconds = 3;
int64 max_resolution_seconds = 4;

repeated string replica_labels = 5;

repeated StoreMatchers storeMatchers = 6 [(gogoproto.nullable) = false];

bool enableDedup = 7;
bool enablePartialResponse = 8;
bool enableQueryPushdown = 9;
bool skipChunks = 10;
}

message QueryResponse {
oneof result {
/// warnings are additional messages coming from the PromQL engine.
string warnings = 1;

/// timeseries is one series from the result of the executed query.
prometheus_copy.TimeSeries timeseries = 2;
}
}

message QueryRangeRequest {
string query = 1;

int64 start_time_seconds = 2;
int64 end_time_seconds = 3;
int64 interval_seconds = 4;

int64 timeout_seconds = 5;
int64 max_resolution_seconds = 6;

repeated string replica_labels = 7;

repeated StoreMatchers storeMatchers = 8 [(gogoproto.nullable) = false];

bool enableDedup = 9;
bool enablePartialResponse = 10;
bool enableQueryPushdown = 11;
bool skipChunks = 12;
}

message QueryRangeResponse {
oneof result {
/// warnings are additional messages coming from the PromQL engine.
string warnings = 1;

/// timeseries is one series from the result of the executed query.
prometheus_copy.TimeSeries timeseries = 2;
}
}
```

The `Query` Service will be implemented by the gRPC server which is started via the `thanos query` command.

## Alternatives

The alternative to expressing a gRPC Query API would be to use the HTTP APIs and distinguish Queriers via configuration on startup. This would be suboptimal for the following reasons:
* No statically typed API definition, we would need to rely on HTTP API versioning to manage changes to the API that is intended to enable advanced query execution strategies.
* HTTP not as performant as gRPC/HTTP2, gRPC/HTTP2 allows us to use streaming(less connection overhead) and protobuf(smaller response sizes), the current HTTP API does not.
* Ergonomics, gRPC allows us to express a functional API with parameters, HTTP requires request parameter marshalling/unmarshalling which is very error-prone.

## Action Plan

* [X] Define the QueryServer gRPC Service
* [X] Implement the QueryServer gRPC Service in the Thanos Query
170 changes: 170 additions & 0 deletions pkg/api/query/grpc.go
@@ -0,0 +1,170 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package v1

import (
"context"
"time"

"github.com/prometheus/prometheus/promql"
"github.com/thanos-io/thanos/pkg/api/query/querypb"
"github.com/thanos-io/thanos/pkg/query"
"github.com/thanos-io/thanos/pkg/store/labelpb"
"github.com/thanos-io/thanos/pkg/store/storepb/prompb"
"google.golang.org/grpc"
)

type GRPCAPI struct {
now func() time.Time
queryableCreate query.QueryableCreator
queryEngine func(int64) *promql.Engine
defaultMaxResolutionSeconds time.Duration
}

func NewGRPCAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI {
return &GRPCAPI{
now: now,
queryableCreate: creator,
queryEngine: queryEngine,
defaultMaxResolutionSeconds: defaultMaxResolutionSeconds,
}
}

func RegisterQueryServer(queryServer querypb.QueryServer) func(*grpc.Server) {
return func(s *grpc.Server) {
querypb.RegisterQueryServer(s, queryServer)
}
}

func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_QueryServer) error {
ctx := context.Background()
var ts time.Time
if request.TimeSeconds == 0 {
ts = g.now()
} else {
ts = time.Unix(request.TimeSeconds, 0)
}

if request.TimeoutSeconds != 0 {
var cancel context.CancelFunc
timeout := time.Duration(request.TimeoutSeconds) * time.Second
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}

maxResolution := request.MaxResolutionSeconds
if request.MaxResolutionSeconds == 0 {
maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000
}

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return err
}

qe := g.queryEngine(request.MaxResolutionSeconds)
queryable := g.queryableCreate(
request.EnableDedup,
request.ReplicaLabels,
storeMatchers,
maxResolution,
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
)
qry, err := qe.NewInstantQuery(queryable, request.Query, ts)
if err != nil {
return err
}

result := qry.Exec(ctx)
if err := server.Send(querypb.NewQueryWarningsResponse(result.Warnings)); err != nil {
return nil
}

switch vector := result.Value.(type) {
case promql.Scalar:
series := &prompb.TimeSeries{
Samples: []prompb.Sample{{Value: vector.V, Timestamp: vector.T}},
}
if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
}
case promql.Vector:
for _, sample := range vector {
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(sample.Metric),
Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}),
}
if err := server.Send(querypb.NewQueryResponse(series)); err != nil {
return err
}
}

return nil
}

return nil
}

func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Query_QueryRangeServer) error {
ctx := context.Background()
if request.TimeoutSeconds != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(request.TimeoutSeconds))
defer cancel()
}

maxResolution := request.MaxResolutionSeconds
if request.MaxResolutionSeconds == 0 {
maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000
}

storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers)
if err != nil {
return err
}

qe := g.queryEngine(request.MaxResolutionSeconds)
queryable := g.queryableCreate(
request.EnableDedup,
request.ReplicaLabels,
storeMatchers,
maxResolution,
request.EnablePartialResponse,
request.EnableQueryPushdown,
false,
)

startTime := time.Unix(request.StartTimeSeconds, 0)
endTime := time.Unix(request.EndTimeSeconds, 0)
interval := time.Duration(request.IntervalSeconds) * time.Second

qry, err := qe.NewRangeQuery(queryable, request.Query, startTime, endTime, interval)
if err != nil {
return err
}

result := qry.Exec(ctx)
if err := srv.Send(querypb.NewQueryRangeWarningsResponse(result.Warnings)); err != nil {
return err
}

switch matrix := result.Value.(type) {
case promql.Matrix:
for _, series := range matrix {
series := &prompb.TimeSeries{
Labels: labelpb.ZLabelsFromPromLabels(series.Metric),
Samples: prompb.SamplesFromPromqlPoints(series.Points),
}
if err := srv.Send(querypb.NewQueryRangeResponse(series)); err != nil {
return err
}
}

return nil
}

return nil
}