From aa70ff8f35844b4900c0537982f1cb97f42f6232 Mon Sep 17 00:00:00 2001 From: fpetkovski Date: Thu, 17 Mar 2022 10:54:32 +0100 Subject: [PATCH] Implement GRPC query API With the current GRPC APIs, layering Thanos Queriers results in the root querier getting all of the samples and executing the query in memory. As a result, the intermediary Queriers do not do any intensive work and merely transport samples from the Stores to the root Querier. When data is perfectly sharded, users can implement a pattern where the root Querier instructs the intermediary ones to execute the queries from their stores and return back results. The results can then be concatenated by the root querier and returned to the user. In order to support this use case, this commit implements a GRPC API in the Querier which is analogous to the HTTP Query API exposed by Prometheus. Signed-off-by: fpetkovski --- CHANGELOG.md | 1 + Makefile | 3 + cmd/thanos/query.go | 9 +- docs/proposals-done/202203-grpc-query-api.md | 122 + pkg/api/query/grpc.go | 170 ++ pkg/api/query/querypb/query.pb.go | 2163 ++++++++++++++++++ pkg/api/query/querypb/query.proto | 89 + pkg/api/query/querypb/responses.go | 50 + pkg/api/query/querypb/store_matchers.go | 26 + pkg/store/storepb/prompb/samples.go | 19 +- scripts/genproto.sh | 2 +- test/e2e/query_test.go | 216 ++ 12 files changed, 2865 insertions(+), 5 deletions(-) create mode 100644 docs/proposals-done/202203-grpc-query-api.md create mode 100644 pkg/api/query/grpc.go create mode 100644 pkg/api/query/querypb/query.pb.go create mode 100644 pkg/api/query/querypb/query.proto create mode 100644 pkg/api/query/querypb/responses.go create mode 100644 pkg/api/query/querypb/store_matchers.go diff --git a/CHANGELOG.md b/CHANGELOG.md index d07b11e6b1..e6d611e33b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added - [#5220](https://github.com/thanos-io/thanos/pull/5220) Query Frontend: Add `--query-frontend.forward-header` flag, forward headers to downstream querier. +- [#5250](https://github.com/thanos-io/thanos/pull/5250/files) Querier: Expose Query and QueryRange APIs through GRPC. ### Changed diff --git a/Makefile b/Makefile index 64854d22e3..ffb9a1f484 100644 --- a/Makefile +++ b/Makefile @@ -25,6 +25,9 @@ else ifeq ($(arch), armv8) else ifeq ($(arch), arm64) # arm64 BASE_DOCKER_SHA=${arm64} +else ifeq ($(arch), aarch64) + # arm64 + BASE_DOCKER_SHA=${arm64} else echo >&2 "only support amd64 or arm64 arch" && exit 1 endif diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index d21bae5390..7c65edf77d 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -27,7 +27,7 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" - v1 "github.com/thanos-io/thanos/pkg/api/query" + apiv1 "github.com/thanos-io/thanos/pkg/api/query" "github.com/thanos-io/thanos/pkg/compact/downsample" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/discovery/cache" @@ -574,6 +574,7 @@ func runQuery( grpcProbe, prober.NewInstrumentation(comp, logger, extprom.WrapRegistererWithPrefix("thanos_", reg)), ) + engineCreator := engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta) // Start query API + UI HTTP server. { @@ -600,10 +601,10 @@ func runQuery( // TODO(bplotka in PR #513 review): pass all flags, not only the flags needed by prefix rewriting. ui.NewQueryUI(logger, endpoints, webExternalPrefix, webPrefixHeaderName, alertQueryURL).Register(router, ins) - api := v1.NewQueryAPI( + api := apiv1.NewQueryAPI( logger, endpoints.GetEndpointStatus, - engineFactory(promql.NewEngine, engineOpts, dynamicLookbackDelta), + engineCreator, queryableCreator, // NOTE: Will share the same replica label as the query for now. rules.NewGRPCClientWithDedup(rulesProxy, queryReplicaLabels), @@ -676,7 +677,9 @@ func runQuery( info.WithTargetsInfoFunc(), ) + grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, + grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)), grpcserver.WithServer(store.RegisterStoreServer(proxy)), grpcserver.WithServer(rules.RegisterRulesServer(rulesProxy)), grpcserver.WithServer(targets.RegisterTargetsServer(targetsProxy)), diff --git a/docs/proposals-done/202203-grpc-query-api.md b/docs/proposals-done/202203-grpc-query-api.md new file mode 100644 index 0000000000..d3f6fd6dc1 --- /dev/null +++ b/docs/proposals-done/202203-grpc-query-api.md @@ -0,0 +1,122 @@ +## Introduce a Query gRPC API + +* **Owners:** + * `@fpetkovski` + * `@mzardab` + +> TL;DR: Introducing a new gRPC API for `/query` and `/query_range` + +## Why + +We want to be able to distinguish between gRPC Store APIs and other Queriers in the query path. Currently, Thanos Query implements the gRPC Store API and the root Querier does not distinguish between Store targets and other Queriers that are capable of processing a PromQL expression before returning the result. The new gRPC Query API will allow a querier to fan out query execution, in addition to Store API selects. + +This is useful for a few reasons: + +* When Queriers register disjoint Store targets, they should be able to deduplicate series and then execute the query without concerns of duplicate data from other queriers. This new API would enable users to effectively partition by Querier, and avoid shipping raw series back from each disjointed Querier to the root Querier. +* If Queriers register Store targets with overlapping series, users would be able to express a query sharding strategy between Queriers to more effectively distribute query load amongst a fleet of homogenous Queriers. +* The proposed Query API utilizes gRPC instead of HTTP, which would enable gRPC streaming from root Querier all the way to the underlying Store targets (Query API -> Store API) and unlock the performance benefits of gRPC over HTTP. +* When there is only one StoreAPI connected to Thanos Query which completely covers the requested range of the original user's query, then it is more optimal to execute the query directly in the store, instead of sending raw samples to the querier. This scenario is not unlikely given query-frontend's sharding capabilities. + +### Pitfalls of the current solution + +Thanos Query currently allows for `query` and `query_range` operations through HTTP only. Various query strategies can be implemented using the HTTP API, an analogous gRPC API would allow for a more resource efficient and expressive query execution path. The two main reasons are the streaming capabilities that come out of the box with gRPC, statically typed API spec, as well as the lower bandwidth utilization which protobuf enables. + +## Goals +* Introduce a gRPC Query API implementation equivalent to the current Querier HTTP API (`query` for instant queries, `query_range` for range queries) + +## Non-Goals + +* Implementation of potential query sharding strategies described in this proposal. +* Streaming implementations for `query` and `query_range` rpc's, these will be introduced as additional `QueryStream` and `QueryRangeStream` rpc's subsequently. +* Response series ordering equivalent to the current Prometheus Query HTTP API behaviour + +### Audience +* Thanos Maintainers +* Thanos Users + +## How + +We propose defining the following gRPC API: + +```protobuf +service Query { + rpc Query(QueryRequest) returns (stream QueryResponse); + + rpc QueryRange(QueryRangeRequest) returns (stream QueryRangeResponse); +} +``` + +Where the `QueryRequest`, `QueryResponse`, `QueryRangeRequest` and `Query RangeResponse` are defined as follows: + +```protobuf +message QueryRequest { + string query = 1; + + int64 time_seconds = 2; + int64 timeout_seconds = 3; + int64 max_resolution_seconds = 4; + + repeated string replica_labels = 5; + + repeated StoreMatchers storeMatchers = 6 [(gogoproto.nullable) = false]; + + bool enableDedup = 7; + bool enablePartialResponse = 8; + bool enableQueryPushdown = 9; + bool skipChunks = 10; +} + +message QueryResponse { + oneof result { + /// warnings are additional messages coming from the PromQL engine. + string warnings = 1; + + /// timeseries is one series from the result of the executed query. + prometheus_copy.TimeSeries timeseries = 2; + } +} + +message QueryRangeRequest { + string query = 1; + + int64 start_time_seconds = 2; + int64 end_time_seconds = 3; + int64 interval_seconds = 4; + + int64 timeout_seconds = 5; + int64 max_resolution_seconds = 6; + + repeated string replica_labels = 7; + + repeated StoreMatchers storeMatchers = 8 [(gogoproto.nullable) = false]; + + bool enableDedup = 9; + bool enablePartialResponse = 10; + bool enableQueryPushdown = 11; + bool skipChunks = 12; +} + +message QueryRangeResponse { + oneof result { + /// warnings are additional messages coming from the PromQL engine. + string warnings = 1; + + /// timeseries is one series from the result of the executed query. + prometheus_copy.TimeSeries timeseries = 2; + } +} +``` + +The `Query` Service will be implemented by the gRPC server which is started via the `thanos query` command. + +## Alternatives + +The alternative to expressing a gRPC Query API would be to use the HTTP APIs and distinguish Queriers via configuration on startup. This would be suboptimal for the following reasons: +* No statically typed API definition, we would need to rely on HTTP API versioning to manage changes to the API that is intended to enable advanced query execution strategies. +* HTTP not as performant as gRPC/HTTP2, gRPC/HTTP2 allows us to use streaming(less connection overhead) and protobuf(smaller response sizes), the current HTTP API does not. +* Ergonomics, gRPC allows us to express a functional API with parameters, HTTP requires request parameter marshalling/unmarshalling which is very error-prone. + +## Action Plan + +* [X] Define the QueryServer gRPC Service +* [X] Implement the QueryServer gRPC Service in the Thanos Query diff --git a/pkg/api/query/grpc.go b/pkg/api/query/grpc.go new file mode 100644 index 0000000000..b3882b5fe5 --- /dev/null +++ b/pkg/api/query/grpc.go @@ -0,0 +1,170 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package v1 + +import ( + "context" + "time" + + "github.com/prometheus/prometheus/promql" + "github.com/thanos-io/thanos/pkg/api/query/querypb" + "github.com/thanos-io/thanos/pkg/query" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "google.golang.org/grpc" +) + +type GRPCAPI struct { + now func() time.Time + queryableCreate query.QueryableCreator + queryEngine func(int64) *promql.Engine + defaultMaxResolutionSeconds time.Duration +} + +func NewGRPCAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI { + return &GRPCAPI{ + now: now, + queryableCreate: creator, + queryEngine: queryEngine, + defaultMaxResolutionSeconds: defaultMaxResolutionSeconds, + } +} + +func RegisterQueryServer(queryServer querypb.QueryServer) func(*grpc.Server) { + return func(s *grpc.Server) { + querypb.RegisterQueryServer(s, queryServer) + } +} + +func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_QueryServer) error { + ctx := context.Background() + var ts time.Time + if request.TimeSeconds == 0 { + ts = g.now() + } else { + ts = time.Unix(request.TimeSeconds, 0) + } + + if request.TimeoutSeconds != 0 { + var cancel context.CancelFunc + timeout := time.Duration(request.TimeoutSeconds) * time.Second + ctx, cancel = context.WithTimeout(ctx, timeout) + defer cancel() + } + + maxResolution := request.MaxResolutionSeconds + if request.MaxResolutionSeconds == 0 { + maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000 + } + + storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers) + if err != nil { + return err + } + + qe := g.queryEngine(request.MaxResolutionSeconds) + queryable := g.queryableCreate( + request.EnableDedup, + request.ReplicaLabels, + storeMatchers, + maxResolution, + request.EnablePartialResponse, + request.EnableQueryPushdown, + false, + ) + qry, err := qe.NewInstantQuery(queryable, request.Query, ts) + if err != nil { + return err + } + + result := qry.Exec(ctx) + if err := server.Send(querypb.NewQueryWarningsResponse(result.Warnings)); err != nil { + return nil + } + + switch vector := result.Value.(type) { + case promql.Scalar: + series := &prompb.TimeSeries{ + Samples: []prompb.Sample{{Value: vector.V, Timestamp: vector.T}}, + } + if err := server.Send(querypb.NewQueryResponse(series)); err != nil { + return err + } + case promql.Vector: + for _, sample := range vector { + series := &prompb.TimeSeries{ + Labels: labelpb.ZLabelsFromPromLabels(sample.Metric), + Samples: prompb.SamplesFromPromqlPoints([]promql.Point{sample.Point}), + } + if err := server.Send(querypb.NewQueryResponse(series)); err != nil { + return err + } + } + + return nil + } + + return nil +} + +func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Query_QueryRangeServer) error { + ctx := context.Background() + if request.TimeoutSeconds != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Duration(request.TimeoutSeconds)) + defer cancel() + } + + maxResolution := request.MaxResolutionSeconds + if request.MaxResolutionSeconds == 0 { + maxResolution = g.defaultMaxResolutionSeconds.Milliseconds() / 1000 + } + + storeMatchers, err := querypb.StoreMatchersToLabelMatchers(request.StoreMatchers) + if err != nil { + return err + } + + qe := g.queryEngine(request.MaxResolutionSeconds) + queryable := g.queryableCreate( + request.EnableDedup, + request.ReplicaLabels, + storeMatchers, + maxResolution, + request.EnablePartialResponse, + request.EnableQueryPushdown, + false, + ) + + startTime := time.Unix(request.StartTimeSeconds, 0) + endTime := time.Unix(request.EndTimeSeconds, 0) + interval := time.Duration(request.IntervalSeconds) * time.Second + + qry, err := qe.NewRangeQuery(queryable, request.Query, startTime, endTime, interval) + if err != nil { + return err + } + + result := qry.Exec(ctx) + if err := srv.Send(querypb.NewQueryRangeWarningsResponse(result.Warnings)); err != nil { + return err + } + + switch matrix := result.Value.(type) { + case promql.Matrix: + for _, series := range matrix { + series := &prompb.TimeSeries{ + Labels: labelpb.ZLabelsFromPromLabels(series.Metric), + Samples: prompb.SamplesFromPromqlPoints(series.Points), + } + if err := srv.Send(querypb.NewQueryRangeResponse(series)); err != nil { + return err + } + } + + return nil + } + + return nil +} diff --git a/pkg/api/query/querypb/query.pb.go b/pkg/api/query/querypb/query.pb.go new file mode 100644 index 0000000000..a7a9711bea --- /dev/null +++ b/pkg/api/query/querypb/query.pb.go @@ -0,0 +1,2163 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: api/query/querypb/query.proto + +package querypb + +import ( + context "context" + fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + storepb "github.com/thanos-io/thanos/pkg/store/storepb" + prompb "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type QueryRequest struct { + Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` + TimeSeconds int64 `protobuf:"varint,2,opt,name=time_seconds,json=timeSeconds,proto3" json:"time_seconds,omitempty"` + TimeoutSeconds int64 `protobuf:"varint,3,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` + MaxResolutionSeconds int64 `protobuf:"varint,4,opt,name=max_resolution_seconds,json=maxResolutionSeconds,proto3" json:"max_resolution_seconds,omitempty"` + ReplicaLabels []string `protobuf:"bytes,5,rep,name=replica_labels,json=replicaLabels,proto3" json:"replica_labels,omitempty"` + StoreMatchers []StoreMatchers `protobuf:"bytes,6,rep,name=storeMatchers,proto3" json:"storeMatchers"` + EnableDedup bool `protobuf:"varint,7,opt,name=enableDedup,proto3" json:"enableDedup,omitempty"` + EnablePartialResponse bool `protobuf:"varint,8,opt,name=enablePartialResponse,proto3" json:"enablePartialResponse,omitempty"` + EnableQueryPushdown bool `protobuf:"varint,9,opt,name=enableQueryPushdown,proto3" json:"enableQueryPushdown,omitempty"` + SkipChunks bool `protobuf:"varint,10,opt,name=skipChunks,proto3" json:"skipChunks,omitempty"` +} + +func (m *QueryRequest) Reset() { *m = QueryRequest{} } +func (m *QueryRequest) String() string { return proto.CompactTextString(m) } +func (*QueryRequest) ProtoMessage() {} +func (*QueryRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_4b2aba43925d729f, []int{0} +} +func (m *QueryRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *QueryRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_QueryRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *QueryRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryRequest.Merge(m, src) +} +func (m *QueryRequest) XXX_Size() int { + return m.Size() +} +func (m *QueryRequest) XXX_DiscardUnknown() { + xxx_messageInfo_QueryRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryRequest proto.InternalMessageInfo + +type StoreMatchers struct { + LabelMatchers []storepb.LabelMatcher `protobuf:"bytes,1,rep,name=labelMatchers,proto3" json:"labelMatchers"` +} + +func (m *StoreMatchers) Reset() { *m = StoreMatchers{} } +func (m *StoreMatchers) String() string { return proto.CompactTextString(m) } +func (*StoreMatchers) ProtoMessage() {} +func (*StoreMatchers) Descriptor() ([]byte, []int) { + return fileDescriptor_4b2aba43925d729f, []int{1} +} +func (m *StoreMatchers) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *StoreMatchers) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_StoreMatchers.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *StoreMatchers) XXX_Merge(src proto.Message) { + xxx_messageInfo_StoreMatchers.Merge(m, src) +} +func (m *StoreMatchers) XXX_Size() int { + return m.Size() +} +func (m *StoreMatchers) XXX_DiscardUnknown() { + xxx_messageInfo_StoreMatchers.DiscardUnknown(m) +} + +var xxx_messageInfo_StoreMatchers proto.InternalMessageInfo + +type QueryResponse struct { + // Types that are valid to be assigned to Result: + // *QueryResponse_Warnings + // *QueryResponse_Timeseries + Result isQueryResponse_Result `protobuf_oneof:"result"` +} + +func (m *QueryResponse) Reset() { *m = QueryResponse{} } +func (m *QueryResponse) String() string { return proto.CompactTextString(m) } +func (*QueryResponse) ProtoMessage() {} +func (*QueryResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_4b2aba43925d729f, []int{2} +} +func (m *QueryResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *QueryResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_QueryResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *QueryResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryResponse.Merge(m, src) +} +func (m *QueryResponse) XXX_Size() int { + return m.Size() +} +func (m *QueryResponse) XXX_DiscardUnknown() { + xxx_messageInfo_QueryResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryResponse proto.InternalMessageInfo + +type isQueryResponse_Result interface { + isQueryResponse_Result() + MarshalTo([]byte) (int, error) + Size() int +} + +type QueryResponse_Warnings struct { + Warnings string `protobuf:"bytes,1,opt,name=warnings,proto3,oneof" json:"warnings,omitempty"` +} +type QueryResponse_Timeseries struct { + Timeseries *prompb.TimeSeries `protobuf:"bytes,2,opt,name=timeseries,proto3,oneof" json:"timeseries,omitempty"` +} + +func (*QueryResponse_Warnings) isQueryResponse_Result() {} +func (*QueryResponse_Timeseries) isQueryResponse_Result() {} + +func (m *QueryResponse) GetResult() isQueryResponse_Result { + if m != nil { + return m.Result + } + return nil +} + +func (m *QueryResponse) GetWarnings() string { + if x, ok := m.GetResult().(*QueryResponse_Warnings); ok { + return x.Warnings + } + return "" +} + +func (m *QueryResponse) GetTimeseries() *prompb.TimeSeries { + if x, ok := m.GetResult().(*QueryResponse_Timeseries); ok { + return x.Timeseries + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*QueryResponse) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*QueryResponse_Warnings)(nil), + (*QueryResponse_Timeseries)(nil), + } +} + +type QueryRangeRequest struct { + Query string `protobuf:"bytes,1,opt,name=query,proto3" json:"query,omitempty"` + StartTimeSeconds int64 `protobuf:"varint,2,opt,name=start_time_seconds,json=startTimeSeconds,proto3" json:"start_time_seconds,omitempty"` + EndTimeSeconds int64 `protobuf:"varint,3,opt,name=end_time_seconds,json=endTimeSeconds,proto3" json:"end_time_seconds,omitempty"` + IntervalSeconds int64 `protobuf:"varint,4,opt,name=interval_seconds,json=intervalSeconds,proto3" json:"interval_seconds,omitempty"` + TimeoutSeconds int64 `protobuf:"varint,5,opt,name=timeout_seconds,json=timeoutSeconds,proto3" json:"timeout_seconds,omitempty"` + MaxResolutionSeconds int64 `protobuf:"varint,6,opt,name=max_resolution_seconds,json=maxResolutionSeconds,proto3" json:"max_resolution_seconds,omitempty"` + ReplicaLabels []string `protobuf:"bytes,7,rep,name=replica_labels,json=replicaLabels,proto3" json:"replica_labels,omitempty"` + StoreMatchers []StoreMatchers `protobuf:"bytes,8,rep,name=storeMatchers,proto3" json:"storeMatchers"` + EnableDedup bool `protobuf:"varint,9,opt,name=enableDedup,proto3" json:"enableDedup,omitempty"` + EnablePartialResponse bool `protobuf:"varint,10,opt,name=enablePartialResponse,proto3" json:"enablePartialResponse,omitempty"` + EnableQueryPushdown bool `protobuf:"varint,11,opt,name=enableQueryPushdown,proto3" json:"enableQueryPushdown,omitempty"` + SkipChunks bool `protobuf:"varint,12,opt,name=skipChunks,proto3" json:"skipChunks,omitempty"` +} + +func (m *QueryRangeRequest) Reset() { *m = QueryRangeRequest{} } +func (m *QueryRangeRequest) String() string { return proto.CompactTextString(m) } +func (*QueryRangeRequest) ProtoMessage() {} +func (*QueryRangeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_4b2aba43925d729f, []int{3} +} +func (m *QueryRangeRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *QueryRangeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_QueryRangeRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *QueryRangeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryRangeRequest.Merge(m, src) +} +func (m *QueryRangeRequest) XXX_Size() int { + return m.Size() +} +func (m *QueryRangeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_QueryRangeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryRangeRequest proto.InternalMessageInfo + +type QueryRangeResponse struct { + // Types that are valid to be assigned to Result: + // *QueryRangeResponse_Warnings + // *QueryRangeResponse_Timeseries + Result isQueryRangeResponse_Result `protobuf_oneof:"result"` +} + +func (m *QueryRangeResponse) Reset() { *m = QueryRangeResponse{} } +func (m *QueryRangeResponse) String() string { return proto.CompactTextString(m) } +func (*QueryRangeResponse) ProtoMessage() {} +func (*QueryRangeResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_4b2aba43925d729f, []int{4} +} +func (m *QueryRangeResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *QueryRangeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_QueryRangeResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *QueryRangeResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryRangeResponse.Merge(m, src) +} +func (m *QueryRangeResponse) XXX_Size() int { + return m.Size() +} +func (m *QueryRangeResponse) XXX_DiscardUnknown() { + xxx_messageInfo_QueryRangeResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryRangeResponse proto.InternalMessageInfo + +type isQueryRangeResponse_Result interface { + isQueryRangeResponse_Result() + MarshalTo([]byte) (int, error) + Size() int +} + +type QueryRangeResponse_Warnings struct { + Warnings string `protobuf:"bytes,1,opt,name=warnings,proto3,oneof" json:"warnings,omitempty"` +} +type QueryRangeResponse_Timeseries struct { + Timeseries *prompb.TimeSeries `protobuf:"bytes,2,opt,name=timeseries,proto3,oneof" json:"timeseries,omitempty"` +} + +func (*QueryRangeResponse_Warnings) isQueryRangeResponse_Result() {} +func (*QueryRangeResponse_Timeseries) isQueryRangeResponse_Result() {} + +func (m *QueryRangeResponse) GetResult() isQueryRangeResponse_Result { + if m != nil { + return m.Result + } + return nil +} + +func (m *QueryRangeResponse) GetWarnings() string { + if x, ok := m.GetResult().(*QueryRangeResponse_Warnings); ok { + return x.Warnings + } + return "" +} + +func (m *QueryRangeResponse) GetTimeseries() *prompb.TimeSeries { + if x, ok := m.GetResult().(*QueryRangeResponse_Timeseries); ok { + return x.Timeseries + } + return nil +} + +// XXX_OneofWrappers is for the internal use of the proto package. +func (*QueryRangeResponse) XXX_OneofWrappers() []interface{} { + return []interface{}{ + (*QueryRangeResponse_Warnings)(nil), + (*QueryRangeResponse_Timeseries)(nil), + } +} + +func init() { + proto.RegisterType((*QueryRequest)(nil), "thanos.QueryRequest") + proto.RegisterType((*StoreMatchers)(nil), "thanos.StoreMatchers") + proto.RegisterType((*QueryResponse)(nil), "thanos.QueryResponse") + proto.RegisterType((*QueryRangeRequest)(nil), "thanos.QueryRangeRequest") + proto.RegisterType((*QueryRangeResponse)(nil), "thanos.QueryRangeResponse") +} + +func init() { proto.RegisterFile("api/query/querypb/query.proto", fileDescriptor_4b2aba43925d729f) } + +var fileDescriptor_4b2aba43925d729f = []byte{ + // 627 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x95, 0xcf, 0x6e, 0xd3, 0x4c, + 0x14, 0xc5, 0xed, 0x2f, 0x4d, 0x9a, 0xdc, 0x34, 0x6d, 0xbf, 0x21, 0x45, 0x6e, 0x00, 0x63, 0x22, + 0x55, 0x04, 0x09, 0x25, 0x55, 0xa9, 0xd8, 0x21, 0x41, 0x01, 0xa9, 0x8b, 0x22, 0xb5, 0x6e, 0x57, + 0x6c, 0xa2, 0x49, 0x72, 0x95, 0x58, 0x75, 0x66, 0xdc, 0x99, 0x71, 0xff, 0x88, 0x3d, 0x6b, 0xde, + 0x80, 0xf7, 0xe0, 0x09, 0xba, 0xec, 0x92, 0x15, 0x82, 0xf6, 0x45, 0x90, 0xc7, 0x76, 0xb0, 0x4b, + 0x54, 0x35, 0x20, 0xb1, 0x71, 0x3c, 0xe7, 0x9c, 0x9b, 0x3b, 0x77, 0xf2, 0x8b, 0x0d, 0x0f, 0x68, + 0xe0, 0x75, 0x8e, 0x42, 0x14, 0x67, 0xf1, 0x35, 0xe8, 0xc5, 0x9f, 0xed, 0x40, 0x70, 0xc5, 0x49, + 0x49, 0x8d, 0x28, 0xe3, 0xb2, 0x51, 0x1f, 0xf2, 0x21, 0xd7, 0x52, 0x27, 0xba, 0x8b, 0xdd, 0xc6, + 0xaa, 0x54, 0x5c, 0x60, 0x47, 0x5f, 0x83, 0x5e, 0x47, 0x9d, 0x05, 0x28, 0x13, 0xcb, 0xc9, 0x5b, + 0x81, 0xe0, 0xe3, 0x7c, 0xa2, 0xf9, 0xa5, 0x00, 0x0b, 0x7b, 0x51, 0x2b, 0x17, 0x8f, 0x42, 0x94, + 0x8a, 0xd4, 0xa1, 0xa8, 0x5b, 0x5b, 0xa6, 0x63, 0xb6, 0x2a, 0x6e, 0xbc, 0x20, 0x8f, 0x60, 0x41, + 0x79, 0x63, 0xec, 0x4a, 0xec, 0x73, 0x36, 0x90, 0xd6, 0x7f, 0x8e, 0xd9, 0x2a, 0xb8, 0xd5, 0x48, + 0xdb, 0x8f, 0x25, 0xf2, 0x18, 0x96, 0xa2, 0x25, 0x0f, 0xd5, 0x24, 0x55, 0xd0, 0xa9, 0xc5, 0x44, + 0x4e, 0x83, 0x9b, 0x70, 0x77, 0x4c, 0x4f, 0xbb, 0x02, 0x25, 0xf7, 0x43, 0xe5, 0x71, 0x36, 0xc9, + 0xcf, 0xe9, 0x7c, 0x7d, 0x4c, 0x4f, 0xdd, 0x89, 0x99, 0x56, 0xad, 0xc1, 0xa2, 0xc0, 0xc0, 0xf7, + 0xfa, 0xb4, 0xeb, 0xd3, 0x1e, 0xfa, 0xd2, 0x2a, 0x3a, 0x85, 0x56, 0xc5, 0xad, 0x25, 0xea, 0x8e, + 0x16, 0xc9, 0x2b, 0xa8, 0xe9, 0x69, 0xdf, 0x51, 0xd5, 0x1f, 0xa1, 0x90, 0x56, 0xc9, 0x29, 0xb4, + 0xaa, 0x1b, 0x2b, 0xed, 0xf8, 0x08, 0xdb, 0xfb, 0x59, 0x73, 0x6b, 0xee, 0xfc, 0xdb, 0x43, 0xc3, + 0xcd, 0x57, 0x10, 0x07, 0xaa, 0xc8, 0x68, 0xcf, 0xc7, 0x37, 0x38, 0x08, 0x03, 0x6b, 0xde, 0x31, + 0x5b, 0x65, 0x37, 0x2b, 0x91, 0x4d, 0x58, 0x89, 0x97, 0xbb, 0x54, 0x28, 0x8f, 0xfa, 0x2e, 0xca, + 0x80, 0x33, 0x89, 0x56, 0x59, 0x67, 0xa7, 0x9b, 0x64, 0x1d, 0xee, 0xc4, 0x86, 0x3e, 0xef, 0xdd, + 0x50, 0x8e, 0x06, 0xfc, 0x84, 0x59, 0x15, 0x5d, 0x33, 0xcd, 0x22, 0x36, 0x80, 0x3c, 0xf4, 0x82, + 0xd7, 0xa3, 0x90, 0x1d, 0x4a, 0x0b, 0x74, 0x30, 0xa3, 0x34, 0xf7, 0xa0, 0x96, 0x9b, 0x87, 0xbc, + 0x84, 0x9a, 0x3e, 0x9c, 0xc9, 0xf4, 0xa6, 0x9e, 0xbe, 0x9e, 0x4e, 0xbf, 0x93, 0x31, 0xd3, 0xe1, + 0x73, 0x05, 0xcd, 0x63, 0xa8, 0x25, 0x38, 0x24, 0xbb, 0xbe, 0x0f, 0xe5, 0x13, 0x2a, 0x98, 0xc7, + 0x86, 0x32, 0x46, 0x62, 0xdb, 0x70, 0x27, 0x0a, 0x79, 0x01, 0x10, 0xfd, 0xba, 0x12, 0x85, 0x87, + 0x31, 0x15, 0xd5, 0x8d, 0x7b, 0x11, 0x5a, 0x63, 0x54, 0x23, 0x0c, 0x65, 0xb7, 0xcf, 0x83, 0xb3, + 0xf6, 0x81, 0xc6, 0x24, 0x8a, 0x6c, 0x1b, 0x6e, 0xa6, 0x60, 0xab, 0x0c, 0x25, 0x81, 0x32, 0xf4, + 0x55, 0xf3, 0xf3, 0x1c, 0xfc, 0x1f, 0x37, 0xa6, 0x6c, 0x88, 0x37, 0xc3, 0xf8, 0x14, 0x88, 0x54, + 0x54, 0xa8, 0xee, 0x14, 0x24, 0x97, 0xb5, 0x73, 0x90, 0xe1, 0xb2, 0x05, 0xcb, 0xc8, 0x06, 0xf9, + 0x6c, 0x02, 0x26, 0xb2, 0x41, 0x36, 0xf9, 0x04, 0x96, 0x3d, 0xa6, 0x50, 0x1c, 0x53, 0xff, 0x1a, + 0x92, 0x4b, 0xa9, 0x7e, 0x03, 0xec, 0xc5, 0x19, 0x61, 0x2f, 0xcd, 0x04, 0xfb, 0xfc, 0xad, 0x60, + 0x2f, 0xff, 0x2d, 0xec, 0x95, 0x19, 0x60, 0x87, 0x3f, 0x80, 0xbd, 0x7a, 0x5b, 0xd8, 0x17, 0x7e, + 0x83, 0xfd, 0x03, 0x90, 0x2c, 0x20, 0xff, 0x14, 0xcf, 0x8d, 0x8f, 0x26, 0x14, 0x75, 0x77, 0xf2, + 0x3c, 0xbd, 0x99, 0xfc, 0xa9, 0xb2, 0x8f, 0xcf, 0xc6, 0xca, 0x35, 0x35, 0xde, 0xe6, 0xba, 0x49, + 0xde, 0x02, 0xfc, 0xda, 0x3e, 0x59, 0xcd, 0xc7, 0x32, 0xcc, 0x37, 0x1a, 0xd3, 0xac, 0xf4, 0x6b, + 0xb6, 0xd6, 0xce, 0x7f, 0xd8, 0xc6, 0xf9, 0xa5, 0x6d, 0x5e, 0x5c, 0xda, 0xe6, 0xf7, 0x4b, 0xdb, + 0xfc, 0x74, 0x65, 0x1b, 0x17, 0x57, 0xb6, 0xf1, 0xf5, 0xca, 0x36, 0xde, 0xcf, 0x27, 0x6f, 0x8f, + 0x5e, 0x49, 0x3f, 0xdd, 0x9f, 0xfd, 0x0c, 0x00, 0x00, 0xff, 0xff, 0x4f, 0xb6, 0xe9, 0xc9, 0x59, + 0x06, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// QueryClient is the client API for Query service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type QueryClient interface { + Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (Query_QueryClient, error) + QueryRange(ctx context.Context, in *QueryRangeRequest, opts ...grpc.CallOption) (Query_QueryRangeClient, error) +} + +type queryClient struct { + cc *grpc.ClientConn +} + +func NewQueryClient(cc *grpc.ClientConn) QueryClient { + return &queryClient{cc} +} + +func (c *queryClient) Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (Query_QueryClient, error) { + stream, err := c.cc.NewStream(ctx, &_Query_serviceDesc.Streams[0], "/thanos.Query/Query", opts...) + if err != nil { + return nil, err + } + x := &queryQueryClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Query_QueryClient interface { + Recv() (*QueryResponse, error) + grpc.ClientStream +} + +type queryQueryClient struct { + grpc.ClientStream +} + +func (x *queryQueryClient) Recv() (*QueryResponse, error) { + m := new(QueryResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *queryClient) QueryRange(ctx context.Context, in *QueryRangeRequest, opts ...grpc.CallOption) (Query_QueryRangeClient, error) { + stream, err := c.cc.NewStream(ctx, &_Query_serviceDesc.Streams[1], "/thanos.Query/QueryRange", opts...) + if err != nil { + return nil, err + } + x := &queryQueryRangeClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Query_QueryRangeClient interface { + Recv() (*QueryRangeResponse, error) + grpc.ClientStream +} + +type queryQueryRangeClient struct { + grpc.ClientStream +} + +func (x *queryQueryRangeClient) Recv() (*QueryRangeResponse, error) { + m := new(QueryRangeResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// QueryServer is the server API for Query service. +type QueryServer interface { + Query(*QueryRequest, Query_QueryServer) error + QueryRange(*QueryRangeRequest, Query_QueryRangeServer) error +} + +// UnimplementedQueryServer can be embedded to have forward compatible implementations. +type UnimplementedQueryServer struct { +} + +func (*UnimplementedQueryServer) Query(req *QueryRequest, srv Query_QueryServer) error { + return status.Errorf(codes.Unimplemented, "method Query not implemented") +} +func (*UnimplementedQueryServer) QueryRange(req *QueryRangeRequest, srv Query_QueryRangeServer) error { + return status.Errorf(codes.Unimplemented, "method QueryRange not implemented") +} + +func RegisterQueryServer(s *grpc.Server, srv QueryServer) { + s.RegisterService(&_Query_serviceDesc, srv) +} + +func _Query_Query_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(QueryRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(QueryServer).Query(m, &queryQueryServer{stream}) +} + +type Query_QueryServer interface { + Send(*QueryResponse) error + grpc.ServerStream +} + +type queryQueryServer struct { + grpc.ServerStream +} + +func (x *queryQueryServer) Send(m *QueryResponse) error { + return x.ServerStream.SendMsg(m) +} + +func _Query_QueryRange_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(QueryRangeRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(QueryServer).QueryRange(m, &queryQueryRangeServer{stream}) +} + +type Query_QueryRangeServer interface { + Send(*QueryRangeResponse) error + grpc.ServerStream +} + +type queryQueryRangeServer struct { + grpc.ServerStream +} + +func (x *queryQueryRangeServer) Send(m *QueryRangeResponse) error { + return x.ServerStream.SendMsg(m) +} + +var _Query_serviceDesc = grpc.ServiceDesc{ + ServiceName: "thanos.Query", + HandlerType: (*QueryServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "Query", + Handler: _Query_Query_Handler, + ServerStreams: true, + }, + { + StreamName: "QueryRange", + Handler: _Query_QueryRange_Handler, + ServerStreams: true, + }, + }, + Metadata: "api/query/querypb/query.proto", +} + +func (m *QueryRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.SkipChunks { + i-- + if m.SkipChunks { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x50 + } + if m.EnableQueryPushdown { + i-- + if m.EnableQueryPushdown { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x48 + } + if m.EnablePartialResponse { + i-- + if m.EnablePartialResponse { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x40 + } + if m.EnableDedup { + i-- + if m.EnableDedup { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x38 + } + if len(m.StoreMatchers) > 0 { + for iNdEx := len(m.StoreMatchers) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.StoreMatchers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x32 + } + } + if len(m.ReplicaLabels) > 0 { + for iNdEx := len(m.ReplicaLabels) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.ReplicaLabels[iNdEx]) + copy(dAtA[i:], m.ReplicaLabels[iNdEx]) + i = encodeVarintQuery(dAtA, i, uint64(len(m.ReplicaLabels[iNdEx]))) + i-- + dAtA[i] = 0x2a + } + } + if m.MaxResolutionSeconds != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.MaxResolutionSeconds)) + i-- + dAtA[i] = 0x20 + } + if m.TimeoutSeconds != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.TimeoutSeconds)) + i-- + dAtA[i] = 0x18 + } + if m.TimeSeconds != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.TimeSeconds)) + i-- + dAtA[i] = 0x10 + } + if len(m.Query) > 0 { + i -= len(m.Query) + copy(dAtA[i:], m.Query) + i = encodeVarintQuery(dAtA, i, uint64(len(m.Query))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *StoreMatchers) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *StoreMatchers) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *StoreMatchers) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.LabelMatchers) > 0 { + for iNdEx := len(m.LabelMatchers) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.LabelMatchers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *QueryResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Result != nil { + { + size := m.Result.Size() + i -= size + if _, err := m.Result.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } + return len(dAtA) - i, nil +} + +func (m *QueryResponse_Warnings) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryResponse_Warnings) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.Warnings) + copy(dAtA[i:], m.Warnings) + i = encodeVarintQuery(dAtA, i, uint64(len(m.Warnings))) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} +func (m *QueryResponse_Timeseries) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryResponse_Timeseries) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Timeseries != nil { + { + size, err := m.Timeseries.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} +func (m *QueryRangeRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryRangeRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryRangeRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.SkipChunks { + i-- + if m.SkipChunks { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x60 + } + if m.EnableQueryPushdown { + i-- + if m.EnableQueryPushdown { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x58 + } + if m.EnablePartialResponse { + i-- + if m.EnablePartialResponse { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x50 + } + if m.EnableDedup { + i-- + if m.EnableDedup { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x48 + } + if len(m.StoreMatchers) > 0 { + for iNdEx := len(m.StoreMatchers) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.StoreMatchers[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x42 + } + } + if len(m.ReplicaLabels) > 0 { + for iNdEx := len(m.ReplicaLabels) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.ReplicaLabels[iNdEx]) + copy(dAtA[i:], m.ReplicaLabels[iNdEx]) + i = encodeVarintQuery(dAtA, i, uint64(len(m.ReplicaLabels[iNdEx]))) + i-- + dAtA[i] = 0x3a + } + } + if m.MaxResolutionSeconds != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.MaxResolutionSeconds)) + i-- + dAtA[i] = 0x30 + } + if m.TimeoutSeconds != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.TimeoutSeconds)) + i-- + dAtA[i] = 0x28 + } + if m.IntervalSeconds != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.IntervalSeconds)) + i-- + dAtA[i] = 0x20 + } + if m.EndTimeSeconds != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.EndTimeSeconds)) + i-- + dAtA[i] = 0x18 + } + if m.StartTimeSeconds != 0 { + i = encodeVarintQuery(dAtA, i, uint64(m.StartTimeSeconds)) + i-- + dAtA[i] = 0x10 + } + if len(m.Query) > 0 { + i -= len(m.Query) + copy(dAtA[i:], m.Query) + i = encodeVarintQuery(dAtA, i, uint64(len(m.Query))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *QueryRangeResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryRangeResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryRangeResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Result != nil { + { + size := m.Result.Size() + i -= size + if _, err := m.Result.MarshalTo(dAtA[i:]); err != nil { + return 0, err + } + } + } + return len(dAtA) - i, nil +} + +func (m *QueryRangeResponse_Warnings) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryRangeResponse_Warnings) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + i -= len(m.Warnings) + copy(dAtA[i:], m.Warnings) + i = encodeVarintQuery(dAtA, i, uint64(len(m.Warnings))) + i-- + dAtA[i] = 0xa + return len(dAtA) - i, nil +} +func (m *QueryRangeResponse_Timeseries) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryRangeResponse_Timeseries) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + if m.Timeseries != nil { + { + size, err := m.Timeseries.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintQuery(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x12 + } + return len(dAtA) - i, nil +} +func encodeVarintQuery(dAtA []byte, offset int, v uint64) int { + offset -= sovQuery(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *QueryRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Query) + if l > 0 { + n += 1 + l + sovQuery(uint64(l)) + } + if m.TimeSeconds != 0 { + n += 1 + sovQuery(uint64(m.TimeSeconds)) + } + if m.TimeoutSeconds != 0 { + n += 1 + sovQuery(uint64(m.TimeoutSeconds)) + } + if m.MaxResolutionSeconds != 0 { + n += 1 + sovQuery(uint64(m.MaxResolutionSeconds)) + } + if len(m.ReplicaLabels) > 0 { + for _, s := range m.ReplicaLabels { + l = len(s) + n += 1 + l + sovQuery(uint64(l)) + } + } + if len(m.StoreMatchers) > 0 { + for _, e := range m.StoreMatchers { + l = e.Size() + n += 1 + l + sovQuery(uint64(l)) + } + } + if m.EnableDedup { + n += 2 + } + if m.EnablePartialResponse { + n += 2 + } + if m.EnableQueryPushdown { + n += 2 + } + if m.SkipChunks { + n += 2 + } + return n +} + +func (m *StoreMatchers) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.LabelMatchers) > 0 { + for _, e := range m.LabelMatchers { + l = e.Size() + n += 1 + l + sovQuery(uint64(l)) + } + } + return n +} + +func (m *QueryResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Result != nil { + n += m.Result.Size() + } + return n +} + +func (m *QueryResponse_Warnings) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Warnings) + n += 1 + l + sovQuery(uint64(l)) + return n +} +func (m *QueryResponse_Timeseries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Timeseries != nil { + l = m.Timeseries.Size() + n += 1 + l + sovQuery(uint64(l)) + } + return n +} +func (m *QueryRangeRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Query) + if l > 0 { + n += 1 + l + sovQuery(uint64(l)) + } + if m.StartTimeSeconds != 0 { + n += 1 + sovQuery(uint64(m.StartTimeSeconds)) + } + if m.EndTimeSeconds != 0 { + n += 1 + sovQuery(uint64(m.EndTimeSeconds)) + } + if m.IntervalSeconds != 0 { + n += 1 + sovQuery(uint64(m.IntervalSeconds)) + } + if m.TimeoutSeconds != 0 { + n += 1 + sovQuery(uint64(m.TimeoutSeconds)) + } + if m.MaxResolutionSeconds != 0 { + n += 1 + sovQuery(uint64(m.MaxResolutionSeconds)) + } + if len(m.ReplicaLabels) > 0 { + for _, s := range m.ReplicaLabels { + l = len(s) + n += 1 + l + sovQuery(uint64(l)) + } + } + if len(m.StoreMatchers) > 0 { + for _, e := range m.StoreMatchers { + l = e.Size() + n += 1 + l + sovQuery(uint64(l)) + } + } + if m.EnableDedup { + n += 2 + } + if m.EnablePartialResponse { + n += 2 + } + if m.EnableQueryPushdown { + n += 2 + } + if m.SkipChunks { + n += 2 + } + return n +} + +func (m *QueryRangeResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Result != nil { + n += m.Result.Size() + } + return n +} + +func (m *QueryRangeResponse_Warnings) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Warnings) + n += 1 + l + sovQuery(uint64(l)) + return n +} +func (m *QueryRangeResponse_Timeseries) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Timeseries != nil { + l = m.Timeseries.Size() + n += 1 + l + sovQuery(uint64(l)) + } + return n +} + +func sovQuery(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozQuery(x uint64) (n int) { + return sovQuery(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *QueryRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Query = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TimeSeconds", wireType) + } + m.TimeSeconds = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TimeSeconds |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TimeoutSeconds", wireType) + } + m.TimeoutSeconds = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TimeoutSeconds |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxResolutionSeconds", wireType) + } + m.MaxResolutionSeconds = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MaxResolutionSeconds |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ReplicaLabels", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ReplicaLabels = append(m.ReplicaLabels, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StoreMatchers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.StoreMatchers = append(m.StoreMatchers, StoreMatchers{}) + if err := m.StoreMatchers[len(m.StoreMatchers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EnableDedup", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.EnableDedup = bool(v != 0) + case 8: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EnablePartialResponse", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.EnablePartialResponse = bool(v != 0) + case 9: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EnableQueryPushdown", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.EnableQueryPushdown = bool(v != 0) + case 10: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SkipChunks", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.SkipChunks = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipQuery(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *StoreMatchers) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: StoreMatchers: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: StoreMatchers: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LabelMatchers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.LabelMatchers = append(m.LabelMatchers, storepb.LabelMatcher{}) + if err := m.LabelMatchers[len(m.LabelMatchers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQuery(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *QueryResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Warnings", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Result = &QueryResponse_Warnings{string(dAtA[iNdEx:postIndex])} + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &prompb.TimeSeries{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Result = &QueryResponse_Timeseries{v} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQuery(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *QueryRangeRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryRangeRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryRangeRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Query = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field StartTimeSeconds", wireType) + } + m.StartTimeSeconds = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.StartTimeSeconds |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EndTimeSeconds", wireType) + } + m.EndTimeSeconds = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.EndTimeSeconds |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field IntervalSeconds", wireType) + } + m.IntervalSeconds = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.IntervalSeconds |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TimeoutSeconds", wireType) + } + m.TimeoutSeconds = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TimeoutSeconds |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MaxResolutionSeconds", wireType) + } + m.MaxResolutionSeconds = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MaxResolutionSeconds |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ReplicaLabels", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ReplicaLabels = append(m.ReplicaLabels, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field StoreMatchers", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.StoreMatchers = append(m.StoreMatchers, StoreMatchers{}) + if err := m.StoreMatchers[len(m.StoreMatchers)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 9: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EnableDedup", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.EnableDedup = bool(v != 0) + case 10: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EnablePartialResponse", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.EnablePartialResponse = bool(v != 0) + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field EnableQueryPushdown", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.EnableQueryPushdown = bool(v != 0) + case 12: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SkipChunks", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.SkipChunks = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipQuery(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *QueryRangeResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryRangeResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryRangeResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Warnings", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Result = &QueryRangeResponse_Warnings{string(dAtA[iNdEx:postIndex])} + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Timeseries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowQuery + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthQuery + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthQuery + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &prompb.TimeSeries{} + if err := v.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + m.Result = &QueryRangeResponse_Timeseries{v} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipQuery(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthQuery + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipQuery(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowQuery + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowQuery + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowQuery + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthQuery + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupQuery + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthQuery + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthQuery = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowQuery = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupQuery = fmt.Errorf("proto: unexpected end of group") +) diff --git a/pkg/api/query/querypb/query.proto b/pkg/api/query/querypb/query.proto new file mode 100644 index 0000000000..4420b58687 --- /dev/null +++ b/pkg/api/query/querypb/query.proto @@ -0,0 +1,89 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +syntax = "proto3"; +package thanos; + +option go_package = "querypb"; + +import "gogoproto/gogo.proto"; +import "store/storepb/types.proto"; +import "store/storepb/prompb/types.proto"; + +option (gogoproto.sizer_all) = true; +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; +option (gogoproto.goproto_getters_all) = false; + +// Do not generate XXX fields to reduce memory footprint and opening a door +// for zero-copy casts to/from prometheus data types. +option (gogoproto.goproto_unkeyed_all) = false; +option (gogoproto.goproto_unrecognized_all) = false; +option (gogoproto.goproto_sizecache_all) = false; + +message QueryRequest { + string query = 1; + + int64 time_seconds = 2; + int64 timeout_seconds = 3; + int64 max_resolution_seconds = 4; + + repeated string replica_labels = 5; + + repeated StoreMatchers storeMatchers = 6 [(gogoproto.nullable) = false]; + + bool enableDedup = 7; + bool enablePartialResponse = 8; + bool enableQueryPushdown = 9; + bool skipChunks = 10; +} + +message StoreMatchers { + repeated LabelMatcher labelMatchers = 1 [(gogoproto.nullable) = false]; +} + +message QueryResponse { + oneof result { + /// warnings are additional messages coming from the PromQL engine. + string warnings = 1; + + /// timeseries is one series from the result of the executed query. + prometheus_copy.TimeSeries timeseries = 2; + } +} + +message QueryRangeRequest { + string query = 1; + + int64 start_time_seconds = 2; + int64 end_time_seconds = 3; + int64 interval_seconds = 4; + + int64 timeout_seconds = 5; + int64 max_resolution_seconds = 6; + + repeated string replica_labels = 7; + + repeated StoreMatchers storeMatchers = 8 [(gogoproto.nullable) = false]; + + bool enableDedup = 9; + bool enablePartialResponse = 10; + bool enableQueryPushdown = 11; + bool skipChunks = 12; +} + +message QueryRangeResponse { + oneof result { + /// warnings are additional messages coming from the PromQL engine. + string warnings = 1; + + /// timeseries is one series from the result of the executed query. + prometheus_copy.TimeSeries timeseries = 2; + } +} + +service Query { + rpc Query(QueryRequest) returns (stream QueryResponse); + + rpc QueryRange(QueryRangeRequest) returns (stream QueryRangeResponse); +} diff --git a/pkg/api/query/querypb/responses.go b/pkg/api/query/querypb/responses.go new file mode 100644 index 0000000000..cd24a5ead8 --- /dev/null +++ b/pkg/api/query/querypb/responses.go @@ -0,0 +1,50 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package querypb + +import ( + "strings" + + "github.com/thanos-io/thanos/pkg/store/storepb/prompb" +) + +func NewQueryResponse(series *prompb.TimeSeries) *QueryResponse { + return &QueryResponse{ + Result: &QueryResponse_Timeseries{ + Timeseries: series, + }, + } +} + +func NewQueryWarningsResponse(errs []error) *QueryResponse { + warnings := make([]string, len(errs)) + for _, err := range errs { + warnings = append(warnings, err.Error()) + } + return &QueryResponse{ + Result: &QueryResponse_Warnings{ + Warnings: strings.Join(warnings, ", "), + }, + } +} + +func NewQueryRangeResponse(series *prompb.TimeSeries) *QueryRangeResponse { + return &QueryRangeResponse{ + Result: &QueryRangeResponse_Timeseries{ + Timeseries: series, + }, + } +} + +func NewQueryRangeWarningsResponse(errs []error) *QueryRangeResponse { + warnings := make([]string, len(errs)) + for _, err := range errs { + warnings = append(warnings, err.Error()) + } + return &QueryRangeResponse{ + Result: &QueryRangeResponse_Warnings{ + Warnings: strings.Join(warnings, ", "), + }, + } +} diff --git a/pkg/api/query/querypb/store_matchers.go b/pkg/api/query/querypb/store_matchers.go new file mode 100644 index 0000000000..41b8f223cb --- /dev/null +++ b/pkg/api/query/querypb/store_matchers.go @@ -0,0 +1,26 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package querypb + +import ( + "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +func StoreMatchersToLabelMatchers(matchers []StoreMatchers) ([][]*labels.Matcher, error) { + if len(matchers) == 0 { + return nil, nil + } + + labelMatchers := make([][]*labels.Matcher, len(matchers)) + for i, storeMatcher := range matchers { + storeMatchers, err := storepb.MatchersToPromMatchers(storeMatcher.LabelMatchers...) + if err != nil { + return nil, err + } + labelMatchers[i] = storeMatchers + } + + return labelMatchers, nil +} diff --git a/pkg/store/storepb/prompb/samples.go b/pkg/store/storepb/prompb/samples.go index 4ae1da98f0..6ec77d58e6 100644 --- a/pkg/store/storepb/prompb/samples.go +++ b/pkg/store/storepb/prompb/samples.go @@ -3,7 +3,10 @@ package prompb -import "github.com/prometheus/common/model" +import ( + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/promql" +) // SamplesFromSamplePairs converts a slice of model.SamplePair // to a slice of Sample. @@ -18,3 +21,17 @@ func SamplesFromSamplePairs(samples []model.SamplePair) []Sample { return result } + +// SamplesFromPromqlPoints converts a slice of promql.Point +// to a slice of Sample. +func SamplesFromPromqlPoints(samples []promql.Point) []Sample { + result := make([]Sample, 0, len(samples)) + for _, s := range samples { + result = append(result, Sample{ + Value: s.V, + Timestamp: s.T, + }) + } + + return result +} diff --git a/scripts/genproto.sh b/scripts/genproto.sh index 3110114791..e662aefea7 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -25,7 +25,7 @@ PATH=${PATH}:/tmp/protobin GOGOPROTO_ROOT="$(GO111MODULE=on go list -modfile=.bingo/protoc-gen-gogofast.mod -f '{{ .Dir }}' -m github.com/gogo/protobuf)" GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf" -DIRS="store/storepb/ store/storepb/prompb/ store/labelpb rules/rulespb targets/targetspb store/hintspb queryfrontend metadata/metadatapb exemplars/exemplarspb info/infopb" +DIRS="store/storepb/ store/storepb/prompb/ store/labelpb rules/rulespb targets/targetspb store/hintspb queryfrontend metadata/metadatapb exemplars/exemplarspb info/infopb api/query/querypb" echo "generating code" pushd "pkg" for dir in ${DIRS}; do diff --git a/test/e2e/query_test.go b/test/e2e/query_test.go index 280b2cd9d1..49e85467e5 100644 --- a/test/e2e/query_test.go +++ b/test/e2e/query_test.go @@ -6,6 +6,7 @@ package e2e_test import ( "context" "fmt" + "io" "net/http" "net/http/httptest" "net/url" @@ -23,6 +24,9 @@ import ( config_util "github.com/prometheus/common/config" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" + "github.com/thanos-io/thanos/pkg/api/query/querypb" + prompb_copy "github.com/thanos-io/thanos/pkg/store/storepb/prompb" + "google.golang.org/grpc" "github.com/chromedp/cdproto/network" "github.com/chromedp/chromedp" @@ -1303,3 +1307,215 @@ func TestSidecarAlignmentPushdown(t *testing.T) { return nil }) } + +func TestGrpcInstantQuery(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("e2e_test_query_grpc_api") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + promConfig := e2ethanos.DefaultPromConfig("p1", 0, "", "") + prom, sidecar, err := e2ethanos.NewPrometheusWithSidecar(e, "p1", promConfig, "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + + endpoints := []string{ + sidecar.InternalEndpoint("grpc"), + } + querier, err := e2ethanos. + NewQuerierBuilder(e, "1", endpoints...). + Build() + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(querier)) + + now := time.Now() + samples := []fakeMetricSample{ + { + label: "test", + value: 1, + timestampUnixNano: now.UnixNano(), + }, + { + label: "test", + value: 2, + timestampUnixNano: now.Add(time.Hour).UnixNano(), + }, + } + ctx := context.Background() + testutil.Ok(t, synthesizeSamples(ctx, prom, samples)) + + grpcConn, err := grpc.Dial(querier.Endpoint("grpc"), grpc.WithInsecure()) + testutil.Ok(t, err) + queryClient := querypb.NewQueryClient(grpcConn) + + queries := []struct { + time time.Time + expectedResult float64 + }{ + { + time: now, + expectedResult: 1, + }, + { + time: now.Add(time.Hour), + expectedResult: 2, + }, + } + + for _, query := range queries { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + err = runutil.Retry(5*time.Second, ctx.Done(), func() error { + result, err := queryClient.Query(ctx, &querypb.QueryRequest{ + Query: "my_fake_metric", + TimeSeconds: query.time.Unix(), + }) + + if err != nil { + return err + } + + var warnings string + var series []*prompb_copy.TimeSeries + for { + msg, err := result.Recv() + if err == io.EOF { + break + } + + s := msg.GetTimeseries() + if s != nil { + series = append(series, s) + } + w := msg.GetWarnings() + if w != "" { + warnings = w + } + } + + if warnings != "" { + return fmt.Errorf("got warnings, expected none") + } + + if len(series) != 1 { + return fmt.Errorf("got empty result from querier") + } + + if len(series[0].Samples) != 1 { + return fmt.Errorf("got empty timeseries from querier") + } + + if series[0].Samples[0].Value != query.expectedResult { + return fmt.Errorf("got invalid result from querier") + } + + return nil + }) + testutil.Ok(t, err) + cancel() + } +} + +func TestGrpcQueryRange(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("e2e_test_query_grpc_api") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + promConfig := e2ethanos.DefaultPromConfig("p1", 0, "", "") + prom, sidecar, err := e2ethanos.NewPrometheusWithSidecar(e, "p1", promConfig, "", e2ethanos.DefaultPrometheusImage(), "", "remote-write-receiver") + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(prom, sidecar)) + + endpoints := []string{ + sidecar.InternalEndpoint("grpc"), + } + querier, err := e2ethanos. + NewQuerierBuilder(e, "1", endpoints...). + Build() + testutil.Ok(t, err) + testutil.Ok(t, e2e.StartAndWaitReady(querier)) + + now := time.Now() + samples := []fakeMetricSample{ + { + label: "test", + value: 1, + timestampUnixNano: now.UnixNano(), + }, + { + label: "test", + value: 2, + timestampUnixNano: now.Add(time.Second * 15).UnixNano(), + }, + { + label: "test", + value: 3, + timestampUnixNano: now.Add(time.Second * 30).UnixNano(), + }, + { + label: "test", + value: 4, + timestampUnixNano: now.Add(time.Second * 45).UnixNano(), + }, + { + label: "test", + value: 5, + timestampUnixNano: now.Add(time.Minute).UnixNano(), + }, + } + ctx := context.Background() + testutil.Ok(t, synthesizeSamples(ctx, prom, samples)) + + grpcConn, err := grpc.Dial(querier.Endpoint("grpc"), grpc.WithInsecure()) + testutil.Ok(t, err) + queryClient := querypb.NewQueryClient(grpcConn) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancel() + err = runutil.Retry(5*time.Second, ctx.Done(), func() error { + result, err := queryClient.QueryRange(ctx, &querypb.QueryRangeRequest{ + Query: "my_fake_metric", + StartTimeSeconds: now.Unix(), + EndTimeSeconds: now.Add(time.Minute).Unix(), + IntervalSeconds: 15, + }) + + if err != nil { + return err + } + + var warnings string + var series []*prompb_copy.TimeSeries + for { + msg, err := result.Recv() + if err == io.EOF { + break + } + + s := msg.GetTimeseries() + if s != nil { + series = append(series, s) + } + w := msg.GetWarnings() + if w != "" { + warnings = w + } + } + if warnings != "" { + return fmt.Errorf("got warnings, expected none") + } + + if len(series) != 1 { + return fmt.Errorf("got empty result from querier") + } + + if len(series[0].Samples) != 5 { + return fmt.Errorf("got empty timeseries from querier") + } + + return nil + }) + testutil.Ok(t, err) +}