From 20c1e54e860c74465cb1716170f9bd529b58867c Mon Sep 17 00:00:00 2001 From: fpetkovski Date: Thu, 21 Apr 2022 08:08:30 +0200 Subject: [PATCH] Implement Query API discovery A recent commit (#5250) added a GRPC API to Thanos Query which allows executing PromQL over GRPC. This API is currently not discoverable through endpointsets which makes it hard for other Thanos components to use it. This commit extends endpointsets with a GetQueryAPIClients method which returns Query API clients to all components which support this API. Signed-off-by: fpetkovski --- cmd/thanos/query.go | 1 + pkg/info/info.go | 18 +++ pkg/info/infopb/rpc.pb.go | 234 +++++++++++++++++++++++++++++----- pkg/info/infopb/rpc.proto | 19 ++- pkg/query/endpointset.go | 33 +++++ pkg/query/endpointset_test.go | 15 +++ 6 files changed, 285 insertions(+), 35 deletions(-) diff --git a/cmd/thanos/query.go b/cmd/thanos/query.go index 7c65edf77d..ce110edbd7 100644 --- a/cmd/thanos/query.go +++ b/cmd/thanos/query.go @@ -675,6 +675,7 @@ func runQuery( info.WithRulesInfoFunc(), info.WithMetricMetadataInfoFunc(), info.WithTargetsInfoFunc(), + info.WithQueryAPIInfoFunc(), ) grpcAPI := apiv1.NewGRPCAPI(time.Now, queryableCreator, engineCreator, instantDefaultMaxSourceResolution) diff --git a/pkg/info/info.go b/pkg/info/info.go index 40df172a68..f61fdef187 100644 --- a/pkg/info/info.go +++ b/pkg/info/info.go @@ -25,6 +25,7 @@ type InfoServer struct { getRulesInfo func() *infopb.RulesInfo getTargetsInfo func() *infopb.TargetsInfo getMetricMetadataInfo func() *infopb.MetricMetadataInfo + getQueryAPIInfo func() *infopb.QueryAPIInfo } // NewInfoServer creates a new server instance for given component @@ -42,6 +43,7 @@ func NewInfoServer( getRulesInfo: func() *infopb.RulesInfo { return nil }, getTargetsInfo: func() *infopb.TargetsInfo { return nil }, getMetricMetadataInfo: func() *infopb.MetricMetadataInfo { return nil }, + getQueryAPIInfo: func() *infopb.QueryAPIInfo { return nil }, } for _, o := range options { @@ -144,6 +146,21 @@ func WithMetricMetadataInfoFunc(getMetricMetadataInfo ...func() *infopb.MetricMe } } +// WithQueryAPIInfoFunc determines the function that should be executed to obtain +// the query information. If no function is provided, the default empty +// query info is returned. Only the first function from the list is considered. +func WithQueryAPIInfoFunc(queryInfo ...func() *infopb.QueryAPIInfo) ServerOptionFunc { + if len(queryInfo) == 0 { + return func(s *InfoServer) { + s.getQueryAPIInfo = func() *infopb.QueryAPIInfo { return &infopb.QueryAPIInfo{} } + } + } + + return func(s *InfoServer) { + s.getQueryAPIInfo = queryInfo[0] + } +} + // RegisterInfoServer registers the info server. func RegisterInfoServer(infoSrv infopb.InfoServer) func(*grpc.Server) { return func(s *grpc.Server) { @@ -161,5 +178,6 @@ func (srv *InfoServer) Info(ctx context.Context, req *infopb.InfoRequest) (*info Rules: srv.getRulesInfo(), Targets: srv.getTargetsInfo(), MetricMetadata: srv.getMetricMetadataInfo(), + Query: srv.getQueryAPIInfo(), }, nil } diff --git a/pkg/info/infopb/rpc.pb.go b/pkg/info/infopb/rpc.pb.go index 3da712b3e5..703fa1ff80 100644 --- a/pkg/info/infopb/rpc.pb.go +++ b/pkg/info/infopb/rpc.pb.go @@ -78,6 +78,8 @@ type InfoResponse struct { Targets *TargetsInfo `protobuf:"bytes,6,opt,name=targets,proto3" json:"targets,omitempty"` // ExemplarsInfo holds the metadata related to Exemplars API if exposed by the component otherwise it will be null. Exemplars *ExemplarsInfo `protobuf:"bytes,7,opt,name=exemplars,proto3" json:"exemplars,omitempty"` + // QueryAPIInfo holds the metadata related to Query API if exposed by the component, otherwise it will be null. + Query *QueryAPIInfo `protobuf:"bytes,8,opt,name=query,proto3" json:"query,omitempty"` } func (m *InfoResponse) Reset() { *m = InfoResponse{} } @@ -302,6 +304,43 @@ func (m *ExemplarsInfo) XXX_DiscardUnknown() { var xxx_messageInfo_ExemplarsInfo proto.InternalMessageInfo +// QueryInfo holds the metadata related to Query API exposed by the component. +type QueryAPIInfo struct { +} + +func (m *QueryAPIInfo) Reset() { *m = QueryAPIInfo{} } +func (m *QueryAPIInfo) String() string { return proto.CompactTextString(m) } +func (*QueryAPIInfo) ProtoMessage() {} +func (*QueryAPIInfo) Descriptor() ([]byte, []int) { + return fileDescriptor_a1214ec45d2bf952, []int{7} +} +func (m *QueryAPIInfo) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *QueryAPIInfo) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_QueryAPIInfo.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *QueryAPIInfo) XXX_Merge(src proto.Message) { + xxx_messageInfo_QueryAPIInfo.Merge(m, src) +} +func (m *QueryAPIInfo) XXX_Size() int { + return m.Size() +} +func (m *QueryAPIInfo) XXX_DiscardUnknown() { + xxx_messageInfo_QueryAPIInfo.DiscardUnknown(m) +} + +var xxx_messageInfo_QueryAPIInfo proto.InternalMessageInfo + func init() { proto.RegisterType((*InfoRequest)(nil), "thanos.info.InfoRequest") proto.RegisterType((*InfoResponse)(nil), "thanos.info.InfoResponse") @@ -310,40 +349,43 @@ func init() { proto.RegisterType((*MetricMetadataInfo)(nil), "thanos.info.MetricMetadataInfo") proto.RegisterType((*TargetsInfo)(nil), "thanos.info.TargetsInfo") proto.RegisterType((*ExemplarsInfo)(nil), "thanos.info.ExemplarsInfo") + proto.RegisterType((*QueryAPIInfo)(nil), "thanos.info.QueryAPIInfo") } func init() { proto.RegisterFile("info/infopb/rpc.proto", fileDescriptor_a1214ec45d2bf952) } var fileDescriptor_a1214ec45d2bf952 = []byte{ - // 437 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xcf, 0x6e, 0xd3, 0x40, - 0x10, 0xc6, 0xed, 0x26, 0x4d, 0xf0, 0x98, 0x80, 0x58, 0x15, 0xb4, 0xc9, 0xc1, 0x8d, 0xac, 0x1e, - 0x72, 0x40, 0xb6, 0x14, 0x24, 0x84, 0xc4, 0x89, 0x56, 0x95, 0x40, 0xa2, 0x17, 0x37, 0xa7, 0x5e, - 0xa2, 0x4d, 0x99, 0x06, 0x4b, 0xde, 0x3f, 0x78, 0xb7, 0x52, 0x7a, 0xe3, 0x11, 0x78, 0xac, 0x1c, - 0x7b, 0xe4, 0x84, 0x20, 0x79, 0x11, 0xb4, 0xbb, 0x6e, 0x89, 0x45, 0x4f, 0xbd, 0xd8, 0xbb, 0xfb, - 0xfb, 0xbe, 0xd9, 0x99, 0xf1, 0x18, 0x5e, 0x96, 0xe2, 0x4a, 0xe6, 0xf6, 0xa1, 0x16, 0x79, 0xad, - 0x2e, 0x33, 0x55, 0x4b, 0x23, 0x49, 0x6c, 0xbe, 0x32, 0x21, 0x75, 0x66, 0xc1, 0x68, 0xa8, 0x8d, - 0xac, 0x31, 0xaf, 0xd8, 0x02, 0x2b, 0xb5, 0xc8, 0xcd, 0x8d, 0x42, 0xed, 0x75, 0xa3, 0x83, 0xa5, - 0x5c, 0x4a, 0xb7, 0xcc, 0xed, 0xca, 0x9f, 0xa6, 0x03, 0x88, 0x3f, 0x89, 0x2b, 0x59, 0xe0, 0xb7, - 0x6b, 0xd4, 0x26, 0xfd, 0xde, 0x81, 0xa7, 0x7e, 0xaf, 0x95, 0x14, 0x1a, 0xc9, 0x5b, 0x00, 0x17, - 0x6c, 0xae, 0xd1, 0x68, 0x1a, 0x8e, 0x3b, 0x93, 0x78, 0xfa, 0x22, 0x6b, 0xae, 0xbc, 0xf8, 0x6c, - 0xd1, 0x39, 0x9a, 0xe3, 0xee, 0xfa, 0xd7, 0x61, 0x50, 0x44, 0x55, 0xb3, 0xd7, 0xe4, 0x08, 0x06, - 0x27, 0x92, 0x2b, 0x29, 0x50, 0x98, 0xd9, 0x8d, 0x42, 0xba, 0x37, 0x0e, 0x27, 0x51, 0xd1, 0x3e, - 0x24, 0xaf, 0x61, 0xdf, 0x25, 0x4c, 0x3b, 0xe3, 0x70, 0x12, 0x4f, 0x5f, 0x65, 0x3b, 0xb5, 0x64, - 0xe7, 0x96, 0xb8, 0x64, 0xbc, 0xc8, 0xaa, 0xeb, 0xeb, 0x0a, 0x35, 0xed, 0x3e, 0xa0, 0x2e, 0x2c, - 0xf1, 0x6a, 0x27, 0x22, 0x1f, 0xe1, 0x39, 0x47, 0x53, 0x97, 0x97, 0x73, 0x8e, 0x86, 0x7d, 0x61, - 0x86, 0xd1, 0x7d, 0xe7, 0x3b, 0x6c, 0xf9, 0xce, 0x9c, 0xe6, 0xac, 0x91, 0xb8, 0x00, 0xcf, 0x78, - 0xeb, 0x8c, 0x4c, 0xa1, 0x6f, 0x58, 0xbd, 0xb4, 0x0d, 0xe8, 0xb9, 0x08, 0xb4, 0x15, 0x61, 0xe6, - 0x99, 0xb3, 0xde, 0x09, 0xc9, 0x3b, 0x88, 0x70, 0x85, 0x5c, 0x55, 0xac, 0xd6, 0xb4, 0xef, 0x5c, - 0xa3, 0x96, 0xeb, 0xf4, 0x8e, 0x3a, 0xdf, 0x3f, 0x71, 0xfa, 0x01, 0xa2, 0xfb, 0xca, 0xc9, 0x10, - 0x9e, 0xf0, 0x52, 0xcc, 0x4d, 0xc9, 0x91, 0x86, 0xe3, 0x70, 0xd2, 0x29, 0xfa, 0xbc, 0x14, 0xb3, - 0x92, 0xa3, 0x43, 0x6c, 0xe5, 0xd1, 0x5e, 0x83, 0xd8, 0xca, 0xa2, 0x34, 0x86, 0xe8, 0xbe, 0x1d, - 0xe9, 0x01, 0x90, 0xff, 0x6b, 0xb4, 0xdf, 0x7d, 0x27, 0xef, 0xf4, 0x14, 0x06, 0xad, 0x84, 0x1e, - 0x77, 0xf1, 0xf4, 0x04, 0xba, 0xce, 0xfd, 0xbe, 0x79, 0xb7, 0x1b, 0xb5, 0x33, 0x68, 0xa3, 0xe1, - 0x03, 0xc4, 0x8f, 0xdc, 0xf1, 0xd1, 0xfa, 0x4f, 0x12, 0xac, 0x37, 0x49, 0x78, 0xbb, 0x49, 0xc2, - 0xdf, 0x9b, 0x24, 0xfc, 0xb1, 0x4d, 0x82, 0xdb, 0x6d, 0x12, 0xfc, 0xdc, 0x26, 0xc1, 0x45, 0xcf, - 0xff, 0x00, 0x8b, 0x9e, 0x9b, 0xdf, 0x37, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xc9, 0x9c, 0xd8, - 0x20, 0x16, 0x03, 0x00, 0x00, + // 465 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x93, 0xcf, 0x6a, 0xdb, 0x40, + 0x10, 0xc6, 0xa5, 0xf8, 0x5f, 0x34, 0x8a, 0x53, 0xba, 0xa4, 0x45, 0xf6, 0x41, 0x31, 0x22, 0x07, + 0x1f, 0x8a, 0x04, 0x2e, 0x94, 0x42, 0x4f, 0x49, 0x08, 0x34, 0xd0, 0x40, 0xab, 0xf8, 0x94, 0x8b, + 0x59, 0xa7, 0x13, 0x57, 0xa0, 0xd5, 0x6e, 0xb4, 0x6b, 0xb0, 0xdf, 0xa2, 0xaf, 0xd2, 0xb7, 0xf0, + 0x31, 0xc7, 0x9e, 0x4a, 0x6b, 0xbf, 0x48, 0xd9, 0x5d, 0x25, 0xb5, 0xa8, 0x4f, 0xb9, 0x48, 0xbb, + 0xf3, 0xfb, 0xbe, 0xd9, 0x9d, 0x61, 0x16, 0x5e, 0x65, 0xc5, 0x1d, 0x4f, 0xf4, 0x47, 0x4c, 0x93, + 0x52, 0xdc, 0xc6, 0xa2, 0xe4, 0x8a, 0x13, 0x5f, 0x7d, 0xa3, 0x05, 0x97, 0xb1, 0x06, 0xfd, 0x9e, + 0x54, 0xbc, 0xc4, 0x24, 0xa7, 0x53, 0xcc, 0xc5, 0x34, 0x51, 0x4b, 0x81, 0xd2, 0xea, 0xfa, 0x47, + 0x33, 0x3e, 0xe3, 0x66, 0x99, 0xe8, 0x95, 0x8d, 0x46, 0x5d, 0xf0, 0x2f, 0x8b, 0x3b, 0x9e, 0xe2, + 0xfd, 0x1c, 0xa5, 0x8a, 0x7e, 0x34, 0xe0, 0xc0, 0xee, 0xa5, 0xe0, 0x85, 0x44, 0xf2, 0x0e, 0xc0, + 0x24, 0x9b, 0x48, 0x54, 0x32, 0x70, 0x07, 0x8d, 0xa1, 0x3f, 0x7a, 0x19, 0x57, 0x47, 0xde, 0x7c, + 0xd2, 0xe8, 0x1a, 0xd5, 0x59, 0x73, 0xf5, 0xeb, 0xd8, 0x49, 0xbd, 0xbc, 0xda, 0x4b, 0x72, 0x02, + 0xdd, 0x73, 0xce, 0x04, 0x2f, 0xb0, 0x50, 0xe3, 0xa5, 0xc0, 0x60, 0x6f, 0xe0, 0x0e, 0xbd, 0xb4, + 0x1e, 0x24, 0x6f, 0xa0, 0x65, 0x2e, 0x1c, 0x34, 0x06, 0xee, 0xd0, 0x1f, 0xbd, 0x8e, 0xb7, 0x6a, + 0x89, 0xaf, 0x35, 0x31, 0x97, 0xb1, 0x22, 0xad, 0x2e, 0xe7, 0x39, 0xca, 0xa0, 0xb9, 0x43, 0x9d, + 0x6a, 0x62, 0xd5, 0x46, 0x44, 0x3e, 0xc2, 0x0b, 0x86, 0xaa, 0xcc, 0x6e, 0x27, 0x0c, 0x15, 0xfd, + 0x4a, 0x15, 0x0d, 0x5a, 0xc6, 0x77, 0x5c, 0xf3, 0x5d, 0x19, 0xcd, 0x55, 0x25, 0x31, 0x09, 0x0e, + 0x59, 0x2d, 0x46, 0x46, 0xd0, 0x51, 0xb4, 0x9c, 0xe9, 0x06, 0xb4, 0x4d, 0x86, 0xa0, 0x96, 0x61, + 0x6c, 0x99, 0xb1, 0x3e, 0x0a, 0xc9, 0x7b, 0xf0, 0x70, 0x81, 0x4c, 0xe4, 0xb4, 0x94, 0x41, 0xc7, + 0xb8, 0xfa, 0x35, 0xd7, 0xc5, 0x23, 0x35, 0xbe, 0x7f, 0x62, 0x92, 0x40, 0xeb, 0x7e, 0x8e, 0xe5, + 0x32, 0xd8, 0x37, 0xae, 0x5e, 0xcd, 0xf5, 0x45, 0x93, 0xd3, 0xcf, 0x97, 0xb6, 0x50, 0xa3, 0x8b, + 0x4e, 0xc1, 0x7b, 0x6a, 0x15, 0xe9, 0xc1, 0x3e, 0xcb, 0x8a, 0x89, 0xca, 0x18, 0x06, 0xee, 0xc0, + 0x1d, 0x36, 0xd2, 0x0e, 0xcb, 0x8a, 0x71, 0xc6, 0xd0, 0x20, 0xba, 0xb0, 0x68, 0xaf, 0x42, 0x74, + 0xa1, 0x51, 0xe4, 0x83, 0xf7, 0xd4, 0xbf, 0xe8, 0x08, 0xc8, 0xff, 0x4d, 0xd1, 0x83, 0xb2, 0x55, + 0x68, 0x74, 0x01, 0xdd, 0x5a, 0x05, 0xcf, 0x3c, 0xf8, 0x10, 0x0e, 0xb6, 0x4b, 0x1a, 0x9d, 0x43, + 0xd3, 0x64, 0xfb, 0x50, 0xfd, 0xeb, 0x9d, 0xde, 0x9a, 0xd4, 0x7e, 0x6f, 0x07, 0xb1, 0x33, 0x7b, + 0x76, 0xb2, 0xfa, 0x13, 0x3a, 0xab, 0x75, 0xe8, 0x3e, 0xac, 0x43, 0xf7, 0xf7, 0x3a, 0x74, 0xbf, + 0x6f, 0x42, 0xe7, 0x61, 0x13, 0x3a, 0x3f, 0x37, 0xa1, 0x73, 0xd3, 0xb6, 0x2f, 0x68, 0xda, 0x36, + 0x0f, 0xe0, 0xed, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x0b, 0x22, 0x37, 0x8b, 0x57, 0x03, 0x00, + 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -471,6 +513,18 @@ func (m *InfoResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Query != nil { + { + size, err := m.Query.MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintRpc(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0x42 + } if m.Exemplars != nil { { size, err := m.Exemplars.MarshalToSizedBuffer(dAtA[:i]) @@ -690,6 +744,29 @@ func (m *ExemplarsInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *QueryAPIInfo) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *QueryAPIInfo) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *QueryAPIInfo) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + func encodeVarintRpc(dAtA []byte, offset int, v uint64) int { offset -= sovRpc(v) base := offset @@ -746,6 +823,10 @@ func (m *InfoResponse) Size() (n int) { l = m.Exemplars.Size() n += 1 + l + sovRpc(uint64(l)) } + if m.Query != nil { + l = m.Query.Size() + n += 1 + l + sovRpc(uint64(l)) + } return n } @@ -806,6 +887,15 @@ func (m *ExemplarsInfo) Size() (n int) { return n } +func (m *QueryAPIInfo) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + func sovRpc(x uint64) (n int) { return (math_bits.Len64(x|1) + 6) / 7 } @@ -1137,6 +1227,42 @@ func (m *InfoResponse) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Query", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthRpc + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthRpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Query == nil { + m.Query = &QueryAPIInfo{} + } + if err := m.Query.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -1484,6 +1610,56 @@ func (m *ExemplarsInfo) Unmarshal(dAtA []byte) error { } return nil } +func (m *QueryAPIInfo) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: QueryAPIInfo: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: QueryAPIInfo: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipRpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthRpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func skipRpc(dAtA []byte) (n int, err error) { l := len(dAtA) iNdEx := 0 diff --git a/pkg/info/infopb/rpc.proto b/pkg/info/infopb/rpc.proto index 3d05168b31..66a182a3e5 100644 --- a/pkg/info/infopb/rpc.proto +++ b/pkg/info/infopb/rpc.proto @@ -31,21 +31,24 @@ message InfoRequest {} message InfoResponse { repeated ZLabelSet label_sets = 1 [(gogoproto.nullable) = false]; string ComponentType = 2; - + // StoreInfo holds the metadata related to Store API if exposed by the component otherwise it will be null. - StoreInfo store = 3; + StoreInfo store = 3; // RulesInfo holds the metadata related to Rules API if exposed by the component otherwise it will be null. RulesInfo rules = 4; - + // MetricMetadataInfo holds the metadata related to Metadata API if exposed by the component otherwise it will be null. MetricMetadataInfo metric_metadata = 5; - + // TargetsInfo holds the metadata related to Targets API if exposed by the component otherwise it will be null. TargetsInfo targets = 6; - + // ExemplarsInfo holds the metadata related to Exemplars API if exposed by the component otherwise it will be null. ExemplarsInfo exemplars = 7; + + // QueryAPIInfo holds the metadata related to Query API if exposed by the component, otherwise it will be null. + QueryAPIInfo query = 8; } // StoreInfo holds the metadata related to Store API exposed by the component. @@ -70,4 +73,8 @@ message TargetsInfo { message ExemplarsInfo { int64 min_time = 1; int64 max_time = 2; -} \ No newline at end of file +} + +// QueryInfo holds the metadata related to Query API exposed by the component. +message QueryAPIInfo { +} diff --git a/pkg/query/endpointset.go b/pkg/query/endpointset.go index 36eca1e7c4..048c7c1436 100644 --- a/pkg/query/endpointset.go +++ b/pkg/query/endpointset.go @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/thanos-io/thanos/pkg/api/query/querypb" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" @@ -118,6 +120,7 @@ func (es *GRPCEndpointSpec) fillExpectedAPIs(componentType component.Component, Targets: &infopb.TargetsInfo{}, MetricMetadata: &infopb.MetricMetadataInfo{}, Exemplars: &infopb.ExemplarsInfo{}, + Query: &infopb.QueryAPIInfo{}, } } case component.Receive: @@ -371,6 +374,20 @@ func (e *EndpointSet) GetStoreClients() []store.Client { return stores } +// GetQueryAPIClients returns a list of all active query API clients. +func (e *EndpointSet) GetQueryAPIClients() []querypb.QueryClient { + e.endpointsMtx.RLock() + defer e.endpointsMtx.RUnlock() + + stores := make([]querypb.QueryClient, 0, len(e.endpoints)) + for _, er := range e.endpoints { + if er.HasQueryAPI() { + stores = append(stores, er.clients.query) + } + } + return stores +} + // GetRulesClients returns a list of all active rules clients. func (e *EndpointSet) GetRulesClients() []rulespb.RulesClient { e.endpointsMtx.RLock() @@ -648,6 +665,10 @@ func (er *endpointRef) Update(metadata *endpointMetadata) { clients.exemplar = exemplarspb.NewExemplarsClient(er.cc) } + if metadata.Query != nil { + clients.query = querypb.NewQueryClient(er.cc) + } + er.clients = clients er.metadata = metadata } @@ -670,6 +691,13 @@ func (er *endpointRef) HasStoreAPI() bool { return er.clients != nil && er.clients.store != nil } +func (er *endpointRef) HasQueryAPI() bool { + er.mtx.RLock() + defer er.mtx.RUnlock() + + return er.clients != nil && er.clients.query != nil +} + func (er *endpointRef) HasRulesAPI() bool { er.mtx.RLock() defer er.mtx.RUnlock() @@ -768,6 +796,10 @@ func (er *endpointRef) apisPresent() []string { apisPresent = append(apisPresent, "MetricMetadataAPI") } + if er.HasQueryAPI() { + apisPresent = append(apisPresent, "QueryAPI") + } + return apisPresent } @@ -777,6 +809,7 @@ type endpointClients struct { metricMetadata metadatapb.MetadataClient exemplar exemplarspb.ExemplarsClient target targetspb.TargetsClient + query querypb.QueryClient info infopb.InfoClient } diff --git a/pkg/query/endpointset_test.go b/pkg/query/endpointset_test.go index 08e59f858d..3f841a657b 100644 --- a/pkg/query/endpointset_test.go +++ b/pkg/query/endpointset_test.go @@ -51,6 +51,7 @@ var ( Rules: &infopb.RulesInfo{}, MetricMetadata: &infopb.MetricMetadataInfo{}, Targets: &infopb.TargetsInfo{}, + Query: &infopb.QueryAPIInfo{}, } ruleInfo = &infopb.InfoResponse{ ComponentType: component.Rule.String(), @@ -189,6 +190,7 @@ func startTestEndpoints(testEndpointMeta []testEndpointMeta) (*testEndpoints, er Rules: meta.Rules, Targets: meta.Targets, Exemplars: meta.Exemplars, + Query: meta.Query, ComponentType: meta.ComponentType, }, infoDelay: meta.infoDelay, @@ -880,6 +882,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { expectedTarget int expectedMetricMetadata int expectedExemplars int + expectedQueryAPIs int } for _, tc := range []struct { @@ -907,6 +910,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { expectedTarget: 2, // sidecar + querier expectedMetricMetadata: 2, // sidecar + querier expectedExemplars: 3, // sidecar + querier + receiver + expectedQueryAPIs: 1, // querier }, }, }, @@ -980,6 +984,7 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { gotTarget := 0 gotExemplars := 0 gotMetricMetadata := 0 + gotQueryAPIs := 0 for _, er := range endpointSet.endpoints { if er.HasStoreAPI() { @@ -997,6 +1002,9 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { if er.HasMetricMetadataAPI() { gotMetricMetadata += 1 } + if er.HasQueryAPI() { + gotQueryAPIs += 1 + } } testutil.Equals( t, @@ -1031,6 +1039,13 @@ func TestEndpointSet_APIs_Discovery(t *testing.T) { "unexepected discovered ExemplarsAPIs in state %q", tc.states[currentState].name, ) + testutil.Equals( + t, + tc.states[currentState].expectedQueryAPIs, + gotQueryAPIs, + "unexepected discovered QueryAPIs in state %q", + tc.states[currentState].name, + ) currentState = currentState + 1 if len(tc.states) == currentState {