From ac76232df602367d664ace86a61f1a77e3d9417f Mon Sep 17 00:00:00 2001 From: fpetkovski Date: Thu, 21 Apr 2022 08:08:30 +0200 Subject: [PATCH] Implement Query API discovery A recent commit (#5250) added a GRPC API to Thanos Query which allows executing PromQL over GRPC. This API is currently not discoverable through endpointsets which makes it hard for other Thanos components to use this it. This commit extends endpointsets with a GetQueryAPIClients method which returns Query API clients to all components which support this API. --- cmd/thanos/query.go | 3 +- pkg/api/query/grpc.go | 21 ++- pkg/component/component.go | 3 + pkg/info/info.go | 18 +++ pkg/info/infopb/rpc.pb.go | 233 +++++++++++++++++++++++++++++----- pkg/info/infopb/rpc.proto | 19 ++- pkg/query/endpointset.go | 37 ++++++ pkg/query/endpointset_test.go | 33 ++++- 8 files changed, 321 insertions(+), 46 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 7c65edf77d4..6e6483f512f 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -675,9 +675,10 @@ func runQuery( info.WithRulesInfoFunc(), info.WithMetricMetadataInfoFunc(), info.WithTargetsInfoFunc(), + info.WithQueryInfoFunc(), ) - grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution) + grpcAPI := apiv1.NewGRPCAPI(logger, time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution) s := grpcserver.New(logger, reg, tracer, grpcLogOpts, tagOpts, comp, grpcProbe, grpcserver.WithServer(apiv1.RegisterQueryServer(grpcAPI)), grpcserver.WithServer(store.RegisterStoreServer(proxy)), diff --git a/pkg/api/query/grpc.go b/pkg/api/query/grpc.go index b3882b5fe50..da04cbc95d7 100644 --- a/pkg/api/query/grpc.go +++ b/pkg/api/query/grpc.go @@ -7,6 +7,11 @@ import ( "context" "time" + "github.com/go-kit/log" + "github.com/thanos-io/thanos/pkg/component" + + "github.com/thanos-io/thanos/pkg/info/infopb" + "github.com/prometheus/prometheus/promql" "github.com/thanos-io/thanos/pkg/api/query/querypb" "github.com/thanos-io/thanos/pkg/query" @@ -16,14 +21,23 @@ import ( ) type GRPCAPI struct { + logger log.Logger now func() time.Time queryableCreate query.QueryableCreator queryEngine func(int64) *promql.Engine defaultMaxResolutionSeconds time.Duration } -func NewGRPCAPI(now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI { +func (g *GRPCAPI) Info(ctx context.Context, request *infopb.InfoRequest) (*infopb.InfoResponse, error) { + return &infopb.InfoResponse{ + ComponentType: component.QueryAPI.String(), + Query: &infopb.QueryInfo{}, + }, nil +} + +func NewGRPCAPI(logger log.Logger, now func() time.Time, creator query.QueryableCreator, queryEngine func(int64) *promql.Engine, defaultMaxResolutionSeconds time.Duration) *GRPCAPI { return &GRPCAPI{ + logger: logger, now: now, queryableCreate: creator, queryEngine: queryEngine, @@ -64,6 +78,7 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer } qe := g.queryEngine(request.MaxResolutionSeconds) + queryable := g.queryableCreate( request.EnableDedup, request.ReplicaLabels, @@ -71,7 +86,7 @@ func (g *GRPCAPI) Query(request *querypb.QueryRequest, server querypb.Query_Quer maxResolution, request.EnablePartialResponse, request.EnableQueryPushdown, - false, + request.SkipChunks, ) qry, err := qe.NewInstantQuery(queryable, request.Query, ts) if err != nil { @@ -134,7 +149,7 @@ func (g *GRPCAPI) QueryRange(request *querypb.QueryRangeRequest, srv querypb.Que maxResolution, request.EnablePartialResponse, request.EnableQueryPushdown, - false, + request.SkipChunks, ) startTime := time.Unix(request.StartTimeSeconds, 0) diff --git a/pkg/component/component.go b/pkg/component/component.go index b648aca5585..d5bd193e40e 100644 --- a/pkg/component/component.go +++ b/pkg/component/component.go @@ -102,6 +102,8 @@ func FromString(storeType string) StoreAPI { return Receive case "debug": return Debug + case "queryAPI": + return QueryAPI default: return UnknownStoreAPI } @@ -124,4 +126,5 @@ var ( Store = storeAPI{component: component{name: "store"}} UnknownStoreAPI = storeAPI{component: component{name: "unknown-store-api"}} Query = storeAPI{component: component{name: "query"}} + QueryAPI = storeAPI{component: component{name: "queryAPI"}} ) diff --git a/pkg/info/info.go b/pkg/info/info.go index 40df172a68f..e3f3aa3f911 100644 --- a/pkg/info/info.go +++ b/pkg/info/info.go @@ -25,6 +25,7 @@ type InfoServer struct { getRulesInfo func() *infopb.RulesInfo getTargetsInfo func() *infopb.TargetsInfo getMetricMetadataInfo func() *infopb.MetricMetadataInfo + getQueryInfo func() *infopb.QueryInfo } // NewInfoServer creates a new server instance for given component @@ -42,6 +43,7 @@ func NewInfoServer( getRulesInfo: func() *infopb.RulesInfo { return nil }, getTargetsInfo: func() *infopb.TargetsInfo { return nil }, getMetricMetadataInfo: func() *infopb.MetricMetadataInfo { return nil }, + getQueryInfo: func() *infopb.QueryInfo { return nil }, } for _, o := range options { @@ -144,6 +146,21 @@ func WithMetricMetadataInfoFunc(getMetricMetadataInfo ...func() *infopb.MetricMe } } +// WithQueryInfoFunc determines the function that should be executed to obtain +// the query information. If no function is provided, the default empty +// query info is returned. Only the first function from the list is considered. +func WithQueryInfoFunc(queryInfo ...func() *infopb.QueryInfo) ServerOptionFunc { + if len(queryInfo) == 0 { + return func(s *InfoServer) { + s.getQueryInfo = func() *infopb.QueryInfo { return &infopb.QueryInfo{} } + } + } + + return func(s *InfoServer) { + s.getQueryInfo = queryInfo[0] + } +} + // RegisterInfoServer registers the info server. func RegisterInfoServer(infoSrv infopb.InfoServer) func(*grpc.Server) { return func(s *grpc.Server) { @@ -161,5 +178,6 @@ func (srv *InfoServer) Info(ctx context.Context, req *infopb.InfoRequest) (*info Rules: srv.getRulesInfo(), Targets: srv.getTargetsInfo(), MetricMetadata: srv.getMetricMetadataInfo(), + Query: srv.getQueryInfo(), }, nil } diff --git a/pkg/info/infopb/rpc.pb.go b/pkg/info/infopb/rpc.pb.go index 3da712b3e59..ff934a17682 100644 --- a/pkg/info/infopb/rpc.pb.go +++ b/pkg/info/infopb/rpc.pb.go @@ -78,6 +78,8 @@ type InfoResponse struct { Targets *TargetsInfo `protobuf:"bytes,6,opt,name=targets,proto3" json:"targets,omitempty"` // ExemplarsInfo holds the metadata related to Exemplars API if exposed by the component otherwise it will be null. Exemplars *ExemplarsInfo `protobuf:"bytes,7,opt,name=exemplars,proto3" json:"exemplars,omitempty"` + // QueryInfo holds the metadata related to Query API if exposed by the component, otherwise it will be null. + Query *QueryInfo `protobuf:"bytes,8,opt,name=query,proto3" json:"query,omitempty"` } func (m *InfoResponse) Reset() { *m = InfoResponse{} } @@ -302,6 +304,43 @@ func (m *ExemplarsInfo) XXX_DiscardUnknown() { var xxx_messageInfo_ExemplarsInfo proto.InternalMessageInfo +// QueryInfo holds the metadata related to Query API exposed by the component. +type QueryInfo struct { +} + +func (m *QueryInfo) Reset() { *m = QueryInfo{} } +func (m *QueryInfo) String() string { return proto.CompactTextString(m) } +func (*QueryInfo) ProtoMessage() {} +func (*QueryInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_a1214ec45d2bf952, []int{7} +} +func (m *QueryInfo) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *QueryInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_QueryInfo.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *QueryInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryInfo.Merge(m, src) +} +func (m *QueryInfo) XXX_Size() int { + return m.Size() +} +func (m *QueryInfo) XXX_DiscardUnknown() { + xxx_messageInfo_QueryInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryInfo proto.InternalMessageInfo + func init() { proto.RegisterType((*InfoRequest)(nil), "thanos.info.InfoRequest") proto.RegisterType((*InfoResponse)(nil), "thanos.info.InfoResponse") @@ -310,40 +349,42 @@ func init() { proto.RegisterType((*MetricMetadataInfo)(nil), "thanos.info.MetricMetadataInfo") proto.RegisterType((*TargetsInfo)(nil), "thanos.info.TargetsInfo") proto.RegisterType((*ExemplarsInfo)(nil), "thanos.info.ExemplarsInfo") + proto.RegisterType((*QueryInfo)(nil), "thanos.info.QueryInfo") } func init() { proto.RegisterFile("info/infopb/rpc.proto", fileDescriptor_a1214ec45d2bf952) } var fileDescriptor_a1214ec45d2bf952 = []byte{ - // 437 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xcf, 0x6e, 0xd3, 0x40, - 0x10, 0xc6, 0xed, 0x26, 0x4d, 0xf0, 0x98, 0x80, 0x58, 0x15, 0xb4, 0xc9, 0xc1, 0x8d, 0xac, 0x1e, - 0x72, 0x40, 0xb6, 0x14, 0x24, 0x84, 0xc4, 0x89, 0x56, 0x95, 0x40, 0xa2, 0x17, 0x37, 0xa7, 0x5e, - 0xa2, 0x4d, 0x99, 0x06, 0x4b, 0xde, 0x3f, 0x78, 0xb7, 0x52, 0x7a, 0xe3, 0x11, 0x78, 0xac, 0x1c, - 0x7b, 0xe4, 0x84, 0x20, 0x79, 0x11, 0xb4, 0xbb, 0x6e, 0x89, 0x45, 0x4f, 0xbd, 0xd8, 0xbb, 0xfb, - 0xfb, 0xbe, 0xd9, 0x99, 0xf1, 0x18, 0x5e, 0x96, 0xe2, 0x4a, 0xe6, 0xf6, 0xa1, 0x16, 0x79, 0xad, - 0x2e, 0x33, 0x55, 0x4b, 0x23, 0x49, 0x6c, 0xbe, 0x32, 0x21, 0x75, 0x66, 0xc1, 0x68, 0xa8, 0x8d, - 0xac, 0x31, 0xaf, 0xd8, 0x02, 0x2b, 0xb5, 0xc8, 0xcd, 0x8d, 0x42, 0xed, 0x75, 0xa3, 0x83, 0xa5, - 0x5c, 0x4a, 0xb7, 0xcc, 0xed, 0xca, 0x9f, 0xa6, 0x03, 0x88, 0x3f, 0x89, 0x2b, 0x59, 0xe0, 0xb7, - 0x6b, 0xd4, 0x26, 0xfd, 0xde, 0x81, 0xa7, 0x7e, 0xaf, 0x95, 0x14, 0x1a, 0xc9, 0x5b, 0x00, 0x17, - 0x6c, 0xae, 0xd1, 0x68, 0x1a, 0x8e, 0x3b, 0x93, 0x78, 0xfa, 0x22, 0x6b, 0xae, 0xbc, 0xf8, 0x6c, - 0xd1, 0x39, 0x9a, 0xe3, 0xee, 0xfa, 0xd7, 0x61, 0x50, 0x44, 0x55, 0xb3, 0xd7, 0xe4, 0x08, 0x06, - 0x27, 0x92, 0x2b, 0x29, 0x50, 0x98, 0xd9, 0x8d, 0x42, 0xba, 0x37, 0x0e, 0x27, 0x51, 0xd1, 0x3e, - 0x24, 0xaf, 0x61, 0xdf, 0x25, 0x4c, 0x3b, 0xe3, 0x70, 0x12, 0x4f, 0x5f, 0x65, 0x3b, 0xb5, 0x64, - 0xe7, 0x96, 0xb8, 0x64, 0xbc, 0xc8, 0xaa, 0xeb, 0xeb, 0x0a, 0x35, 0xed, 0x3e, 0xa0, 0x2e, 0x2c, - 0xf1, 0x6a, 0x27, 0x22, 0x1f, 0xe1, 0x39, 0x47, 0x53, 0x97, 0x97, 0x73, 0x8e, 0x86, 0x7d, 0x61, - 0x86, 0xd1, 0x7d, 0xe7, 0x3b, 0x6c, 0xf9, 0xce, 0x9c, 0xe6, 0xac, 0x91, 0xb8, 0x00, 0xcf, 0x78, - 0xeb, 0x8c, 0x4c, 0xa1, 0x6f, 0x58, 0xbd, 0xb4, 0x0d, 0xe8, 0xb9, 0x08, 0xb4, 0x15, 0x61, 0xe6, - 0x99, 0xb3, 0xde, 0x09, 0xc9, 0x3b, 0x88, 0x70, 0x85, 0x5c, 0x55, 0xac, 0xd6, 0xb4, 0xef, 0x5c, - 0xa3, 0x96, 0xeb, 0xf4, 0x8e, 0x3a, 0xdf, 0x3f, 0x71, 0xfa, 0x01, 0xa2, 0xfb, 0xca, 0xc9, 0x10, - 0x9e, 0xf0, 0x52, 0xcc, 0x4d, 0xc9, 0x91, 0x86, 0xe3, 0x70, 0xd2, 0x29, 0xfa, 0xbc, 0x14, 0xb3, - 0x92, 0xa3, 0x43, 0x6c, 0xe5, 0xd1, 0x5e, 0x83, 0xd8, 0xca, 0xa2, 0x34, 0x86, 0xe8, 0xbe, 0x1d, - 0xe9, 0x01, 0x90, 0xff, 0x6b, 0xb4, 0xdf, 0x7d, 0x27, 0xef, 0xf4, 0x14, 0x06, 0xad, 0x84, 0x1e, - 0x77, 0xf1, 0xf4, 0x04, 0xba, 0xce, 0xfd, 0xbe, 0x79, 0xb7, 0x1b, 0xb5, 0x33, 0x68, 0xa3, 0xe1, - 0x03, 0xc4, 0x8f, 0xdc, 0xf1, 0xd1, 0xfa, 0x4f, 0x12, 0xac, 0x37, 0x49, 0x78, 0xbb, 0x49, 0xc2, - 0xdf, 0x9b, 0x24, 0xfc, 0xb1, 0x4d, 0x82, 0xdb, 0x6d, 0x12, 0xfc, 0xdc, 0x26, 0xc1, 0x45, 0xcf, - 0xff, 0x00, 0x8b, 0x9e, 0x9b, 0xdf, 0x37, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xc9, 0x9c, 0xd8, - 0x20, 0x16, 0x03, 0x00, 0x00, + // 456 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xc1, 0x6e, 0xd3, 0x30, + 0x18, 0xc7, 0x93, 0xb5, 0x6b, 0x17, 0x87, 0x82, 0xb0, 0x06, 0x72, 0x7b, 0xc8, 0xaa, 0x68, 0x87, + 0x1e, 0x50, 0x22, 0x15, 0x09, 0x21, 0x71, 0x62, 0xd3, 0x24, 0x90, 0xd8, 0x81, 0xac, 0xa7, 0x5d, + 0x2a, 0x67, 0x7c, 0x2b, 0x91, 0xe2, 0xd8, 0xb3, 0x5d, 0xa9, 0x7d, 0x0b, 0x9e, 0x84, 0xe7, 0xe8, + 0x71, 0x47, 0x4e, 0x08, 0xda, 0x17, 0x41, 0xb6, 0xd3, 0xd2, 0x88, 0x9e, 0xb8, 0xb4, 0xb6, 0x7f, + 0xff, 0xff, 0x67, 0xff, 0x3f, 0x7d, 0x41, 0x2f, 0x8a, 0xea, 0x9e, 0xa7, 0xe6, 0x47, 0xe4, 0xa9, + 0x14, 0x77, 0x89, 0x90, 0x5c, 0x73, 0x1c, 0xea, 0xaf, 0xb4, 0xe2, 0x2a, 0x31, 0x60, 0xd0, 0x57, + 0x9a, 0x4b, 0x48, 0x4b, 0x9a, 0x43, 0x29, 0xf2, 0x54, 0x2f, 0x05, 0x28, 0xa7, 0x1b, 0x9c, 0xce, + 0xf8, 0x8c, 0xdb, 0x65, 0x6a, 0x56, 0xee, 0x34, 0xee, 0xa1, 0xf0, 0x63, 0x75, 0xcf, 0x33, 0x78, + 0x98, 0x83, 0xd2, 0xf1, 0xf7, 0x16, 0x7a, 0xe2, 0xf6, 0x4a, 0xf0, 0x4a, 0x01, 0x7e, 0x83, 0x90, + 0x2d, 0x36, 0x55, 0xa0, 0x15, 0xf1, 0x87, 0xad, 0x51, 0x38, 0x7e, 0x9e, 0xd4, 0x57, 0xde, 0x7e, + 0x32, 0xe8, 0x06, 0xf4, 0x45, 0x7b, 0xf5, 0xf3, 0xcc, 0xcb, 0x82, 0xb2, 0xde, 0x2b, 0x7c, 0x8e, + 0x7a, 0x97, 0x9c, 0x09, 0x5e, 0x41, 0xa5, 0x27, 0x4b, 0x01, 0xe4, 0x68, 0xe8, 0x8f, 0x82, 0xac, + 0x79, 0x88, 0x5f, 0xa1, 0x63, 0xfb, 0x60, 0xd2, 0x1a, 0xfa, 0xa3, 0x70, 0xfc, 0x32, 0xd9, 0xcb, + 0x92, 0xdc, 0x18, 0x62, 0x1f, 0xe3, 0x44, 0x46, 0x2d, 0xe7, 0x25, 0x28, 0xd2, 0x3e, 0xa0, 0xce, + 0x0c, 0x71, 0x6a, 0x2b, 0xc2, 0x1f, 0xd0, 0x33, 0x06, 0x5a, 0x16, 0x77, 0x53, 0x06, 0x9a, 0x7e, + 0xa1, 0x9a, 0x92, 0x63, 0xeb, 0x3b, 0x6b, 0xf8, 0xae, 0xad, 0xe6, 0xba, 0x96, 0xd8, 0x02, 0x4f, + 0x59, 0xe3, 0x0c, 0x8f, 0x51, 0x57, 0x53, 0x39, 0x33, 0x0d, 0xe8, 0xd8, 0x0a, 0xa4, 0x51, 0x61, + 0xe2, 0x98, 0xb5, 0x6e, 0x85, 0xf8, 0x2d, 0x0a, 0x60, 0x01, 0x4c, 0x94, 0x54, 0x2a, 0xd2, 0xb5, + 0xae, 0x41, 0xc3, 0x75, 0xb5, 0xa5, 0xd6, 0xf7, 0x57, 0x6c, 0x52, 0x3e, 0xcc, 0x41, 0x2e, 0xc9, + 0xc9, 0x81, 0x94, 0x9f, 0x0d, 0x71, 0x29, 0xad, 0x28, 0x7e, 0x8f, 0x82, 0x5d, 0x9f, 0x70, 0x1f, + 0x9d, 0xb0, 0xa2, 0x9a, 0xea, 0x82, 0x01, 0xf1, 0x87, 0xfe, 0xa8, 0x95, 0x75, 0x59, 0x51, 0x4d, + 0x0a, 0x06, 0x16, 0xd1, 0x85, 0x43, 0x47, 0x35, 0xa2, 0x0b, 0x83, 0xe2, 0x10, 0x05, 0xbb, 0xe6, + 0xc5, 0xa7, 0x08, 0xff, 0xdb, 0x11, 0x33, 0x25, 0x7b, 0x29, 0xe3, 0x2b, 0xd4, 0x6b, 0x3c, 0xff, + 0xff, 0x2f, 0xde, 0xe5, 0x19, 0x5f, 0xa2, 0xb6, 0x2d, 0xf5, 0xae, 0xfe, 0x6f, 0xf6, 0x78, 0x6f, + 0x46, 0x07, 0xfd, 0x03, 0xc4, 0x4d, 0xeb, 0xc5, 0xf9, 0xea, 0x77, 0xe4, 0xad, 0xd6, 0x91, 0xff, + 0xb8, 0x8e, 0xfc, 0x5f, 0xeb, 0xc8, 0xff, 0xb6, 0x89, 0xbc, 0xc7, 0x4d, 0xe4, 0xfd, 0xd8, 0x44, + 0xde, 0x6d, 0xc7, 0x7d, 0x3b, 0x79, 0xc7, 0x8e, 0xfe, 0xeb, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, + 0x4c, 0x74, 0xac, 0xf7, 0x51, 0x03, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -471,6 +512,18 @@ func (m *InfoResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Query != nil { + { + size, err := m.Query.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x42 + } if m.Exemplars != nil { { size, err := m.Exemplars.MarshalToSizedBuffer(dAtA[:i]) @@ -690,6 +743,29 @@ func (m *ExemplarsInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *QueryInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + func encodeVarintRpc(dAtA []byte, offset int, v uint64) int { offset -= sovRpc(v) base := offset @@ -746,6 +822,10 @@ func (m *InfoResponse) Size() (n int) { l = m.Exemplars.Size() n += 1 + l + sovRpc(uint64(l)) } + if m.Query != nil { + l = m.Query.Size() + n += 1 + l + sovRpc(uint64(l)) + } return n } @@ -806,6 +886,15 @@ func (m *ExemplarsInfo) Size() (n int) { return n } +func (m *QueryInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + func sovRpc(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -1137,6 +1226,42 @@ func (m *InfoResponse) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Query == nil { + m.Query = &QueryInfo{} + } + if err := m.Query.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -1484,6 +1609,56 @@ func (m *ExemplarsInfo) Unmarshal(dAtA []byte) error { } return nil } +func (m *QueryInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipRpc(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/info/infopb/rpc.proto b/pkg/info/infopb/rpc.proto index 3d05168b31e..0b66e6b3223 100644 --- a/pkg/info/infopb/rpc.proto +++ b/pkg/info/infopb/rpc.proto @@ -31,21 +31,24 @@ message InfoRequest {} message InfoResponse { repeated ZLabelSet label_sets = 1 [(gogoproto.nullable) = false]; string ComponentType = 2; - + // StoreInfo holds the metadata related to Store API if exposed by the component otherwise it will be null. - StoreInfo store = 3; + StoreInfo store = 3; // RulesInfo holds the metadata related to Rules API if exposed by the component otherwise it will be null. RulesInfo rules = 4; - + // MetricMetadataInfo holds the metadata related to Metadata API if exposed by the component otherwise it will be null. MetricMetadataInfo metric_metadata = 5; - + // TargetsInfo holds the metadata related to Targets API if exposed by the component otherwise it will be null. TargetsInfo targets = 6; - + // ExemplarsInfo holds the metadata related to Exemplars API if exposed by the component otherwise it will be null. ExemplarsInfo exemplars = 7; + + // QueryInfo holds the metadata related to Query API if exposed by the component, otherwise it will be null. + QueryInfo query = 8; } // StoreInfo holds the metadata related to Store API exposed by the component. @@ -70,4 +73,8 @@ message TargetsInfo { message ExemplarsInfo { int64 min_time = 1; int64 max_time = 2; -} \ No newline at end of file +} + +// QueryInfo holds the metadata related to Query API exposed by the component. +message QueryInfo { +} diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 36eca1e7c49..64d32ab124f 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/thanos-io/thanos/pkg/api/query/querypb" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -145,6 +147,10 @@ func (es *GRPCEndpointSpec) fillExpectedAPIs(componentType component.Component, }, Rules: &infopb.RulesInfo{}, } + case component.QueryAPI: + return infopb.InfoResponse{ + Query: &infopb.QueryInfo{}, + } default: return infopb.InfoResponse{} } @@ -371,6 +377,20 @@ func (e *EndpointSet) GetStoreClients() []store.Client { return stores } +// GetQueryAPIClients returns a list of all active query API clients. +func (e *EndpointSet) GetQueryAPIClients() []querypb.QueryClient { + e.endpointsMtx.RLock() + defer e.endpointsMtx.RUnlock() + + stores := make([]querypb.QueryClient, 0, len(e.endpoints)) + for _, er := range e.endpoints { + if er.HasQueryAPI() { + stores = append(stores, er.clients.query) + } + } + return stores +} + // GetRulesClients returns a list of all active rules clients. func (e *EndpointSet) GetRulesClients() []rulespb.RulesClient { e.endpointsMtx.RLock() @@ -648,6 +668,10 @@ func (er *endpointRef) Update(metadata *endpointMetadata) { clients.exemplar = exemplarspb.NewExemplarsClient(er.cc) } + if metadata.Query != nil { + clients.query = querypb.NewQueryClient(er.cc) + } + er.clients = clients er.metadata = metadata } @@ -670,6 +694,13 @@ func (er *endpointRef) HasStoreAPI() bool { return er.clients != nil && er.clients.store != nil } +func (er *endpointRef) HasQueryAPI() bool { + er.mtx.RLock() + defer er.mtx.RUnlock() + + return er.clients != nil && er.clients.query != nil +} + func (er *endpointRef) HasRulesAPI() bool { er.mtx.RLock() defer er.mtx.RUnlock() @@ -768,6 +799,10 @@ func (er *endpointRef) apisPresent() []string { apisPresent = append(apisPresent, "MetricMetadataAPI") } + if er.HasQueryAPI() { + apisPresent = append(apisPresent, "QueryAPI") + } + return apisPresent } @@ -777,6 +812,7 @@ type endpointClients struct { metricMetadata metadatapb.MetadataClient exemplar exemplarspb.ExemplarsClient target targetspb.TargetsClient + query querypb.QueryClient info infopb.InfoClient } @@ -789,5 +825,6 @@ func newEndpointAPIStats() map[component.Component]map[string]int { for i := range storepb.StoreType_name { nodes[component.FromProto(storepb.StoreType(i))] = map[string]int{} } + nodes[component.QueryAPI] = map[string]int{} return nodes } diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 08e59f858d4..6ac8d99a0df 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -75,6 +75,10 @@ var ( }, Exemplars: &infopb.ExemplarsInfo{}, } + queryAPIInfo = &infopb.InfoResponse{ + ComponentType: component.QueryAPI.String(), + Query: &infopb.QueryInfo{}, + } ) type mockedEndpoint struct { @@ -168,15 +172,13 @@ func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, er srv := grpc.NewServer() addr := listener.Addr().String() - storeSrv := &mockedStoreSrv{ - info: storepb.InfoResponse{ + storeSrv := &mockedStoreSrv{} + if meta.Store != nil { + storeSrv.infoDelay = meta.infoDelay + storeSrv.info = storepb.InfoResponse{ LabelSets: meta.extlsetFn(listener.Addr().String()), StoreType: componentTypeToStoreType(meta.ComponentType), - }, - infoDelay: meta.infoDelay, - } - - if meta.Store != nil { + } storeSrv.info.MinTime = meta.Store.MinTime storeSrv.info.MaxTime = meta.Store.MaxTime } @@ -190,6 +192,7 @@ func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, er Targets: meta.Targets, Exemplars: meta.Exemplars, ComponentType: meta.ComponentType, + Query: meta.Query, }, infoDelay: meta.infoDelay, } @@ -590,6 +593,19 @@ func TestEndpointSet_Update(t *testing.T) { } }, }, + { + InfoResponse: queryAPIInfo, + extlsetFn: func(addr string) []labelpb.ZLabelSet { + return nil + }, + }, + // Duplicate query API + { + InfoResponse: queryAPIInfo, + extlsetFn: func(addr string) []labelpb.ZLabelSet { + return nil + }, + }, }) testutil.Ok(t, err) defer endpoint2.Close() @@ -620,6 +636,9 @@ func TestEndpointSet_Update(t *testing.T) { expected[component.Receive] = map[string]int{ "{l1=\"v2\", l2=\"v3\"},{l3=\"v4\"}": 2, } + expected[component.QueryAPI] = map[string]int{ + "": 2, + } testutil.Equals(t, expected, endpointSet.endpointsMetric.storeNodes) // Close remaining endpoint from previous batch