From 71c2a1174ca7d98fcb01cab573feac9ae675fb5a Mon Sep 17 00:00:00 2001 From: Josh Humphries Date: Wed, 5 Oct 2022 22:09:05 -0500 Subject: [PATCH] support v1 of reflection service --- grpcreflect/client.go | 173 ++++++++++--- grpcreflect/client_test.go | 137 ++++++++++- .../grpc_reflection_v1/reflection_grpc.pb.go | 141 +++++++++++ .../internal/grpc_reflection_v1/svc_impl.go | 232 ++++++++++++++++++ internal/testprotos/make_protos.sh | 4 +- 5 files changed, 645 insertions(+), 42 deletions(-) create mode 100644 grpcreflect/internal/grpc_reflection_v1/reflection_grpc.pb.go create mode 100644 grpcreflect/internal/grpc_reflection_v1/svc_impl.go diff --git a/grpcreflect/client.go b/grpcreflect/client.go index 70dc6ad1..02ad1760 100644 --- a/grpcreflect/client.go +++ b/grpcreflect/client.go @@ -8,17 +8,28 @@ import ( "reflect" "runtime" "sync" + "time" "github.com/golang/protobuf/proto" dpb "github.com/golang/protobuf/protoc-gen-go/descriptor" + "google.golang.org/grpc" "google.golang.org/grpc/codes" - rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" + refv1alpha "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" "google.golang.org/grpc/status" "github.com/jhump/protoreflect/desc" + refv1 "github.com/jhump/protoreflect/grpcreflect/internal/grpc_reflection_v1" "github.com/jhump/protoreflect/internal" ) +// If we try the v1 reflection API and get back "not implemented", we'll wait +// this long before trying v1 again. This allows a long-lived client to +// dynamically switch from v1alpha to v1 if the underlying server is updated +// to support it. But it also prevents every stream request from always trying +// v1 first: if we try it and see it fail, we shouldn't continually retry it +// if we expect it will fail again. +const durationBetweenV1Attempts = time.Hour + // elementNotFoundError is the error returned by reflective operations where the // server does not recognize a given file name, symbol name, or extension. type elementNotFoundError struct { @@ -108,12 +119,16 @@ type extDesc struct { // Client is a client connection to a server for performing reflection calls // and resolving remote symbols. type Client struct { - ctx context.Context - stub rpb.ServerReflectionClient + ctx context.Context + now func() time.Time + stubV1 refv1.ServerReflectionClient + stubV1Alpha refv1alpha.ServerReflectionClient - connMu sync.Mutex - cancel context.CancelFunc - stream rpb.ServerReflection_ServerReflectionInfoClient + connMu sync.Mutex + cancel context.CancelFunc + stream refv1alpha.ServerReflection_ServerReflectionInfoClient + useV1Alpha bool + lastTriedV1 time.Time cacheMu sync.RWMutex protosByName map[string]*dpb.FileDescriptorProto @@ -124,10 +139,27 @@ type Client struct { // NewClient creates a new Client with the given root context and using the // given RPC stub for talking to the server. -func NewClient(ctx context.Context, stub rpb.ServerReflectionClient) *Client { +// +// Deprecated: Use NewClientV1Alpha if you are intentionally pinning the +// v1alpha version of the reflection service. Otherwise, use NewClientAuto +// instead. +func NewClient(ctx context.Context, stub refv1alpha.ServerReflectionClient) *Client { + return NewClientV1Alpha(ctx, stub) +} + +// NewClientV1Alpha creates a new Client using the v1alpha version of reflection +// with the given root context and using the given RPC stub for talking to the +// server. +func NewClientV1Alpha(ctx context.Context, stub refv1alpha.ServerReflectionClient) *Client { + return newClient(ctx, nil, stub) +} + +func newClient(ctx context.Context, stubv1 refv1.ServerReflectionClient, stubv1alpha refv1alpha.ServerReflectionClient) *Client { cr := &Client{ ctx: ctx, - stub: stub, + now: time.Now, + stubV1: stubv1, + stubV1Alpha: stubv1alpha, protosByName: map[string]*dpb.FileDescriptorProto{}, filesByName: map[string]*desc.FileDescriptor{}, filesBySymbol: map[string]*desc.FileDescriptor{}, @@ -138,6 +170,26 @@ func NewClient(ctx context.Context, stub rpb.ServerReflectionClient) *Client { return cr } +// NewClientAuto creates a new Client that will use either v1 or v1alpha version +// of reflection (based on what the server supports) with the given root context +// and using the given client connection. +// +// It will first the v1 version of the reflection service. If it gets back an +// "Unimplemented" error, it will fall back to using the v1alpha version. It +// will remember which version the server supports for any subsequent operations +// that need to re-invoke the streaming RPC. But, if it's a very long-lived +// client, it will periodically retry the v1 version (in case the server is +// updated to support it also). The period for these retries is every hour. +func NewClientAuto(ctx context.Context, cc grpc.ClientConnInterface) *Client { + stubv1 := refv1.NewServerReflectionClient(cc) + stubv1alpha := refv1alpha.NewServerReflectionClient(cc) + return newClient(ctx, stubv1, stubv1alpha) +} + +// TODO: We should also have a NewClientV1. However that should not refer to internal +// generated code. So it will have to wait until the grpc-go team fixes this issue: +// https://github.com/grpc/grpc-go/issues/5684 + // FileByFilename asks the server for a file descriptor for the proto file with // the given name. func (cr *Client) FileByFilename(filename string) (*desc.FileDescriptor, error) { @@ -154,8 +206,8 @@ func (cr *Client) FileByFilename(filename string) (*desc.FileDescriptor, error) return cr.descriptorFromProto(fdp) } - req := &rpb.ServerReflectionRequest{ - MessageRequest: &rpb.ServerReflectionRequest_FileByFilename{ + req := &refv1alpha.ServerReflectionRequest{ + MessageRequest: &refv1alpha.ServerReflectionRequest_FileByFilename{ FileByFilename: filename, }, } @@ -167,8 +219,8 @@ func (cr *Client) FileByFilename(filename string) (*desc.FileDescriptor, error) if isNotFound(err) { // file not found? see if we can look up via alternate name if alternate, ok := internal.StdFileAliases[filename]; ok { - req := &rpb.ServerReflectionRequest{ - MessageRequest: &rpb.ServerReflectionRequest_FileByFilename{ + req := &refv1alpha.ServerReflectionRequest{ + MessageRequest: &refv1alpha.ServerReflectionRequest_FileByFilename{ FileByFilename: alternate, }, } @@ -196,8 +248,8 @@ func (cr *Client) FileContainingSymbol(symbol string) (*desc.FileDescriptor, err return fd, nil } - req := &rpb.ServerReflectionRequest{ - MessageRequest: &rpb.ServerReflectionRequest_FileContainingSymbol{ + req := &refv1alpha.ServerReflectionRequest{ + MessageRequest: &refv1alpha.ServerReflectionRequest_FileContainingSymbol{ FileContainingSymbol: symbol, }, } @@ -225,9 +277,9 @@ func (cr *Client) FileContainingExtension(extendedMessageName string, extensionN return fd, nil } - req := &rpb.ServerReflectionRequest{ - MessageRequest: &rpb.ServerReflectionRequest_FileContainingExtension{ - FileContainingExtension: &rpb.ExtensionRequest{ + req := &refv1alpha.ServerReflectionRequest{ + MessageRequest: &refv1alpha.ServerReflectionRequest_FileContainingExtension{ + FileContainingExtension: &refv1alpha.ExtensionRequest{ ContainingType: extendedMessageName, ExtensionNumber: extensionNumber, }, @@ -245,7 +297,7 @@ func (cr *Client) FileContainingExtension(extendedMessageName string, extensionN return fd, err } -func (cr *Client) getAndCacheFileDescriptors(req *rpb.ServerReflectionRequest, expectedName, alias string, accept func(*desc.FileDescriptor) bool) (*desc.FileDescriptor, error) { +func (cr *Client) getAndCacheFileDescriptors(req *refv1alpha.ServerReflectionRequest, expectedName, alias string, accept func(*desc.FileDescriptor) bool) (*desc.FileDescriptor, error) { resp, err := cr.send(req) if err != nil { return nil, err @@ -379,8 +431,8 @@ func (cr *Client) cacheMessageLocked(fd *desc.FileDescriptor, md *desc.MessageDe // AllExtensionNumbersForType asks the server for all known extension numbers // for the given fully-qualified message name. func (cr *Client) AllExtensionNumbersForType(extendedMessageName string) ([]int32, error) { - req := &rpb.ServerReflectionRequest{ - MessageRequest: &rpb.ServerReflectionRequest_AllExtensionNumbersOfType{ + req := &refv1alpha.ServerReflectionRequest{ + MessageRequest: &refv1alpha.ServerReflectionRequest_AllExtensionNumbersOfType{ AllExtensionNumbersOfType: extendedMessageName, }, } @@ -402,8 +454,8 @@ func (cr *Client) AllExtensionNumbersForType(extendedMessageName string) ([]int3 // ListServices asks the server for the fully-qualified names of all exposed // services. func (cr *Client) ListServices() ([]string, error) { - req := &rpb.ServerReflectionRequest{ - MessageRequest: &rpb.ServerReflectionRequest_ListServices{ + req := &refv1alpha.ServerReflectionRequest{ + MessageRequest: &refv1alpha.ServerReflectionRequest_ListServices{ // proto doesn't indicate any purpose for this value and server impl // doesn't actually use it... ListServices: "*", @@ -425,10 +477,10 @@ func (cr *Client) ListServices() ([]string, error) { return serviceNames, nil } -func (cr *Client) send(req *rpb.ServerReflectionRequest) (*rpb.ServerReflectionResponse, error) { +func (cr *Client) send(req *refv1alpha.ServerReflectionRequest) (*refv1alpha.ServerReflectionResponse, error) { // we allow one immediate retry, in case we have a stale stream // (e.g. closed by server) - resp, err := cr.doSend(true, req) + resp, err := cr.doSend(req) if err != nil { return nil, err } @@ -450,16 +502,25 @@ func isNotFound(err error) bool { return ok && s.Code() == codes.NotFound } -func (cr *Client) doSend(retry bool, req *rpb.ServerReflectionRequest) (*rpb.ServerReflectionResponse, error) { +func (cr *Client) doSend(req *refv1alpha.ServerReflectionRequest) (*refv1alpha.ServerReflectionResponse, error) { // TODO: Streams are thread-safe, so we shouldn't need to lock. But without locking, we'll need more machinery // (goroutines and channels) to ensure that responses are correctly correlated with their requests and thus // delivered in correct oder. cr.connMu.Lock() defer cr.connMu.Unlock() - return cr.doSendLocked(retry, req) + return cr.doSendLocked(0, nil, req) } -func (cr *Client) doSendLocked(retry bool, req *rpb.ServerReflectionRequest) (*rpb.ServerReflectionResponse, error) { +func (cr *Client) doSendLocked(attemptCount int, prevErr error, req *refv1alpha.ServerReflectionRequest) (*refv1alpha.ServerReflectionResponse, error) { + if attemptCount >= 3 && prevErr != nil { + return nil, prevErr + } + if status.Code(prevErr) == codes.Unimplemented && cr.useV1() { + cr.useV1Alpha = true + cr.lastTriedV1 = cr.now() + } + attemptCount++ + if err := cr.initStreamLocked(); err != nil { return nil, err } @@ -470,21 +531,15 @@ func (cr *Client) doSendLocked(retry bool, req *rpb.ServerReflectionRequest) (*r _, err = cr.stream.Recv() } cr.resetLocked() - if retry { - return cr.doSendLocked(false, req) - } - return nil, err + return cr.doSendLocked(attemptCount, err, req) } - if resp, err := cr.stream.Recv(); err != nil { + resp, err := cr.stream.Recv() + if err != nil { cr.resetLocked() - if retry { - return cr.doSendLocked(false, req) - } - return nil, err - } else { - return resp, nil + return cr.doSendLocked(attemptCount, err, req) } + return resp, nil } func (cr *Client) initStreamLocked() error { @@ -493,11 +548,34 @@ func (cr *Client) initStreamLocked() error { } var newCtx context.Context newCtx, cr.cancel = context.WithCancel(cr.ctx) + if cr.useV1Alpha == true && cr.now().Sub(cr.lastTriedV1) > durationBetweenV1Attempts { + // we're due for periodic retry of v1 + cr.useV1Alpha = false + } + if cr.useV1() { + // try the v1 API + streamv1, err := cr.stubV1.ServerReflectionInfo(newCtx) + if err == nil { + cr.stream = adaptStreamFromV1{streamv1} + return nil + } + if status.Code(err) != codes.Unimplemented { + return err + } + // oh well, fall through below to try v1alpha and update state + // so we skip straight to v1alpha next time + cr.useV1Alpha = true + cr.lastTriedV1 = cr.now() + } var err error - cr.stream, err = cr.stub.ServerReflectionInfo(newCtx) + cr.stream, err = cr.stubV1Alpha.ServerReflectionInfo(newCtx) return err } +func (cr *Client) useV1() bool { + return !cr.useV1Alpha && cr.stubV1 != nil +} + // Reset ensures that any active stream with the server is closed, releasing any // resources. func (cr *Client) Reset() { @@ -674,3 +752,20 @@ func (mde msgDescriptorExtensions) nestedScopes() []extensionScope { } return scopes } + +type adaptStreamFromV1 struct { + refv1.ServerReflection_ServerReflectionInfoClient +} + +func (a adaptStreamFromV1) Send(request *refv1alpha.ServerReflectionRequest) error { + v1req := refv1.ToV1Request(request) + return a.ServerReflection_ServerReflectionInfoClient.Send(v1req) +} + +func (a adaptStreamFromV1) Recv() (*refv1alpha.ServerReflectionResponse, error) { + v1resp, err := a.ServerReflection_ServerReflectionInfoClient.Recv() + if err != nil { + return nil, err + } + return refv1.ToV1AlphaResponse(v1resp), nil +} diff --git a/grpcreflect/client_test.go b/grpcreflect/client_test.go index 3e62018f..ba3df72f 100644 --- a/grpcreflect/client_test.go +++ b/grpcreflect/client_test.go @@ -8,6 +8,7 @@ import ( "net" "os" "sort" + "sync" "sync/atomic" "testing" "time" @@ -22,8 +23,10 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/reflection" rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" + "google.golang.org/grpc/status" "github.com/jhump/protoreflect/desc" + "github.com/jhump/protoreflect/grpcreflect/internal/grpc_reflection_v1" "github.com/jhump/protoreflect/internal" testprotosgrpc "github.com/jhump/protoreflect/internal/testprotos/grpc" "github.com/jhump/protoreflect/internal/testutil" @@ -60,7 +63,7 @@ func TestMain(m *testing.M) { defer cconn.Close() stub := rpb.NewServerReflectionClient(cconn) - client = NewClient(context.Background(), stub) + client = NewClientV1Alpha(context.Background(), stub) code = m.Run() } @@ -271,7 +274,7 @@ func TestMultipleFiles(t *testing.T) { testutil.Ok(t, err, "failed ot dial %v", l.Addr().String()) cl := rpb.NewServerReflectionClient(cc) - client := NewClient(ctx, cl) + client := NewClientV1Alpha(ctx, cl) defer client.Reset() svcs, err := client.ListServices() testutil.Ok(t, err, "failed to list services") @@ -367,3 +370,133 @@ func msgResponseForFiles(files ...string) *rpb.ServerReflectionResponse_FileDesc }, } } + +func TestAutoVersion(t *testing.T) { + t.Run("v1", func(t *testing.T) { + testClientAuto(t, + func(s *grpc.Server) { + grpc_reflection_v1.Register(s) + }, + []string{ + "grpc.reflection.v1.ServerReflection", + }, + []string{ + "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", + "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", + "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", + "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", + }) + }) + + t.Run("v1alpha", func(t *testing.T) { + testClientAuto(t, + func(s *grpc.Server) { + reflection.Register(s) + }, + []string{ + "grpc.reflection.v1alpha.ServerReflection", + }, + []string{ + // first one fails, so falls back to v1alpha + "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", + "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo", + // next two use v1alpha + "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo", + "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo", + // final one retries v1 + "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", + "/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo", + }) + }) + + t.Run("both", func(t *testing.T) { + testClientAuto(t, + func(s *grpc.Server) { + grpc_reflection_v1.Register(s) + reflection.Register(s) + }, + []string{ + "grpc.reflection.v1.ServerReflection", + "grpc.reflection.v1alpha.ServerReflection", + }, + []string{ + // never uses v1alpha since v1 works + "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", + "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", + "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", + "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", + }) + }) +} + +func testClientAuto(t *testing.T, register func(*grpc.Server), expectedServices []string, expectedLog []string) { + var cap captureStreamNames + svr := grpc.NewServer(grpc.StreamInterceptor(cap.intercept), grpc.UnknownServiceHandler(cap.handleUnknown)) + register(svr) + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + panic(fmt.Sprintf("Failed to open server socket: %s", err.Error())) + } + go svr.Serve(l) + defer svr.Stop() + + cconn, err := grpc.Dial(l.Addr().String(), grpc.WithInsecure()) + if err != nil { + panic(fmt.Sprintf("Failed to create grpc client: %s", err.Error())) + } + defer cconn.Close() + client := NewClientAuto(context.Background(), cconn) + now := time.Now() + client.now = func() time.Time { + return now + } + + svcs, err := client.ListServices() + testutil.Ok(t, err) + sort.Strings(svcs) + testutil.Eq(t, expectedServices, svcs) + client.Reset() + + _, err = client.FileContainingSymbol(svcs[0]) + testutil.Ok(t, err) + client.Reset() + + // at the threshold, but not quite enough to retry + now = now.Add(time.Hour) + _, err = client.ListServices() + testutil.Ok(t, err) + client.Reset() + + // 1 ns more, and we've crossed threshold and will retry + now = now.Add(1) + _, err = client.ListServices() + testutil.Ok(t, err) + client.Reset() + + actualLog := cap.names() + testutil.Eq(t, expectedLog, actualLog) +} + +type captureStreamNames struct { + mu sync.Mutex + log []string +} + +func (c *captureStreamNames) names() []string { + c.mu.Lock() + defer c.mu.Unlock() + ret := make([]string, len(c.log)) + copy(ret, c.log) + return ret +} + +func (c *captureStreamNames) intercept(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + c.mu.Lock() + c.log = append(c.log, info.FullMethod) + c.mu.Unlock() + return handler(srv, ss) +} + +func (c *captureStreamNames) handleUnknown(_ interface{}, _ grpc.ServerStream) error { + return status.Errorf(codes.Unimplemented, "WTF?") +} diff --git a/grpcreflect/internal/grpc_reflection_v1/reflection_grpc.pb.go b/grpcreflect/internal/grpc_reflection_v1/reflection_grpc.pb.go new file mode 100644 index 00000000..6a8ac71a --- /dev/null +++ b/grpcreflect/internal/grpc_reflection_v1/reflection_grpc.pb.go @@ -0,0 +1,141 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.7 +// source: reflection.proto + +package grpc_reflection_v1 + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// ServerReflectionClient is the client API for ServerReflection service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ServerReflectionClient interface { + // The reflection service is structured as a bidirectional stream, ensuring + // all related requests go to a single server. + ServerReflectionInfo(ctx context.Context, opts ...grpc.CallOption) (ServerReflection_ServerReflectionInfoClient, error) +} + +type serverReflectionClient struct { + cc grpc.ClientConnInterface +} + +func NewServerReflectionClient(cc grpc.ClientConnInterface) ServerReflectionClient { + return &serverReflectionClient{cc} +} + +func (c *serverReflectionClient) ServerReflectionInfo(ctx context.Context, opts ...grpc.CallOption) (ServerReflection_ServerReflectionInfoClient, error) { + stream, err := c.cc.NewStream(ctx, &ServerReflection_ServiceDesc.Streams[0], "/grpc.reflection.v1.ServerReflection/ServerReflectionInfo", opts...) + if err != nil { + return nil, err + } + x := &serverReflectionServerReflectionInfoClient{stream} + return x, nil +} + +type ServerReflection_ServerReflectionInfoClient interface { + Send(*ServerReflectionRequest) error + Recv() (*ServerReflectionResponse, error) + grpc.ClientStream +} + +type serverReflectionServerReflectionInfoClient struct { + grpc.ClientStream +} + +func (x *serverReflectionServerReflectionInfoClient) Send(m *ServerReflectionRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *serverReflectionServerReflectionInfoClient) Recv() (*ServerReflectionResponse, error) { + m := new(ServerReflectionResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// ServerReflectionServer is the server API for ServerReflection service. +// All implementations must embed UnimplementedServerReflectionServer +// for forward compatibility +type ServerReflectionServer interface { + // The reflection service is structured as a bidirectional stream, ensuring + // all related requests go to a single server. + ServerReflectionInfo(ServerReflection_ServerReflectionInfoServer) error + mustEmbedUnimplementedServerReflectionServer() +} + +// UnimplementedServerReflectionServer must be embedded to have forward compatible implementations. +type UnimplementedServerReflectionServer struct { +} + +func (UnimplementedServerReflectionServer) ServerReflectionInfo(ServerReflection_ServerReflectionInfoServer) error { + return status.Errorf(codes.Unimplemented, "method ServerReflectionInfo not implemented") +} +func (UnimplementedServerReflectionServer) mustEmbedUnimplementedServerReflectionServer() {} + +// UnsafeServerReflectionServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ServerReflectionServer will +// result in compilation errors. +type UnsafeServerReflectionServer interface { + mustEmbedUnimplementedServerReflectionServer() +} + +func RegisterServerReflectionServer(s grpc.ServiceRegistrar, srv ServerReflectionServer) { + s.RegisterService(&ServerReflection_ServiceDesc, srv) +} + +func _ServerReflection_ServerReflectionInfo_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(ServerReflectionServer).ServerReflectionInfo(&serverReflectionServerReflectionInfoServer{stream}) +} + +type ServerReflection_ServerReflectionInfoServer interface { + Send(*ServerReflectionResponse) error + Recv() (*ServerReflectionRequest, error) + grpc.ServerStream +} + +type serverReflectionServerReflectionInfoServer struct { + grpc.ServerStream +} + +func (x *serverReflectionServerReflectionInfoServer) Send(m *ServerReflectionResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *serverReflectionServerReflectionInfoServer) Recv() (*ServerReflectionRequest, error) { + m := new(ServerReflectionRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// ServerReflection_ServiceDesc is the grpc.ServiceDesc for ServerReflection service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var ServerReflection_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "grpc.reflection.v1.ServerReflection", + HandlerType: (*ServerReflectionServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "ServerReflectionInfo", + Handler: _ServerReflection_ServerReflectionInfo_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "reflection.proto", +} diff --git a/grpcreflect/internal/grpc_reflection_v1/svc_impl.go b/grpcreflect/internal/grpc_reflection_v1/svc_impl.go new file mode 100644 index 00000000..d29e4409 --- /dev/null +++ b/grpcreflect/internal/grpc_reflection_v1/svc_impl.go @@ -0,0 +1,232 @@ +package grpc_reflection_v1 + +import ( + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + "google.golang.org/grpc/reflection/grpc_reflection_v1alpha" +) + +func Register(svr reflection.GRPCServer) { + reflection.Register(registrarInterceptor{svr}) +} + +type registrarInterceptor struct { + svr reflection.GRPCServer +} + +func (r registrarInterceptor) RegisterService(desc *grpc.ServiceDesc, impl interface{}) { + r.svr.RegisterService(&ServerReflection_ServiceDesc, reflectImpl{svr: impl.(grpc_reflection_v1alpha.ServerReflectionServer)}) +} + +func (r registrarInterceptor) GetServiceInfo() map[string]grpc.ServiceInfo { + return r.svr.GetServiceInfo() +} + +type reflectImpl struct { + svr grpc_reflection_v1alpha.ServerReflectionServer + UnimplementedServerReflectionServer +} + +func (r reflectImpl) ServerReflectionInfo(stream ServerReflection_ServerReflectionInfoServer) error { + return r.svr.ServerReflectionInfo(streamImpl{stream}) +} + +type streamImpl struct { + ServerReflection_ServerReflectionInfoServer +} + +func (s streamImpl) Send(response *grpc_reflection_v1alpha.ServerReflectionResponse) error { + return s.ServerReflection_ServerReflectionInfoServer.Send(ToV1Response(response)) +} + +func (s streamImpl) Recv() (*grpc_reflection_v1alpha.ServerReflectionRequest, error) { + resp, err := s.ServerReflection_ServerReflectionInfoServer.Recv() + if err != nil { + return nil, err + } + return ToV1AlphaRequest(resp), nil +} + +func ToV1Request(v1alpha *grpc_reflection_v1alpha.ServerReflectionRequest) *ServerReflectionRequest { + var v1 ServerReflectionRequest + v1.Host = v1alpha.Host + switch mr := v1alpha.MessageRequest.(type) { + case *grpc_reflection_v1alpha.ServerReflectionRequest_FileByFilename: + v1.MessageRequest = &ServerReflectionRequest_FileByFilename{ + FileByFilename: mr.FileByFilename, + } + case *grpc_reflection_v1alpha.ServerReflectionRequest_FileContainingSymbol: + v1.MessageRequest = &ServerReflectionRequest_FileContainingSymbol{ + FileContainingSymbol: mr.FileContainingSymbol, + } + case *grpc_reflection_v1alpha.ServerReflectionRequest_FileContainingExtension: + if mr.FileContainingExtension != nil { + v1.MessageRequest = &ServerReflectionRequest_FileContainingExtension{ + FileContainingExtension: &ExtensionRequest{ + ContainingType: mr.FileContainingExtension.GetContainingType(), + ExtensionNumber: mr.FileContainingExtension.GetExtensionNumber(), + }, + } + } + case *grpc_reflection_v1alpha.ServerReflectionRequest_AllExtensionNumbersOfType: + v1.MessageRequest = &ServerReflectionRequest_AllExtensionNumbersOfType{ + AllExtensionNumbersOfType: mr.AllExtensionNumbersOfType, + } + case *grpc_reflection_v1alpha.ServerReflectionRequest_ListServices: + v1.MessageRequest = &ServerReflectionRequest_ListServices{ + ListServices: mr.ListServices, + } + default: + // no value set + } + return &v1 +} + +func ToV1AlphaRequest(v1 *ServerReflectionRequest) *grpc_reflection_v1alpha.ServerReflectionRequest { + var v1alpha grpc_reflection_v1alpha.ServerReflectionRequest + v1alpha.Host = v1.Host + switch mr := v1.MessageRequest.(type) { + case *ServerReflectionRequest_FileByFilename: + if mr != nil { + v1alpha.MessageRequest = &grpc_reflection_v1alpha.ServerReflectionRequest_FileByFilename{ + FileByFilename: mr.FileByFilename, + } + } + case *ServerReflectionRequest_FileContainingSymbol: + if mr != nil { + v1alpha.MessageRequest = &grpc_reflection_v1alpha.ServerReflectionRequest_FileContainingSymbol{ + FileContainingSymbol: mr.FileContainingSymbol, + } + } + case *ServerReflectionRequest_FileContainingExtension: + if mr != nil { + v1alpha.MessageRequest = &grpc_reflection_v1alpha.ServerReflectionRequest_FileContainingExtension{ + FileContainingExtension: &grpc_reflection_v1alpha.ExtensionRequest{ + ContainingType: mr.FileContainingExtension.GetContainingType(), + ExtensionNumber: mr.FileContainingExtension.GetExtensionNumber(), + }, + } + } + case *ServerReflectionRequest_AllExtensionNumbersOfType: + if mr != nil { + v1alpha.MessageRequest = &grpc_reflection_v1alpha.ServerReflectionRequest_AllExtensionNumbersOfType{ + AllExtensionNumbersOfType: mr.AllExtensionNumbersOfType, + } + } + case *ServerReflectionRequest_ListServices: + if mr != nil { + v1alpha.MessageRequest = &grpc_reflection_v1alpha.ServerReflectionRequest_ListServices{ + ListServices: mr.ListServices, + } + } + default: + // no value set + } + return &v1alpha +} + +func ToV1Response(v1alpha *grpc_reflection_v1alpha.ServerReflectionResponse) *ServerReflectionResponse { + var v1 ServerReflectionResponse + v1.ValidHost = v1alpha.ValidHost + if v1alpha.OriginalRequest != nil { + v1.OriginalRequest = ToV1Request(v1alpha.OriginalRequest) + } + switch mr := v1alpha.MessageResponse.(type) { + case *grpc_reflection_v1alpha.ServerReflectionResponse_FileDescriptorResponse: + if mr != nil { + v1.MessageResponse = &ServerReflectionResponse_FileDescriptorResponse{ + FileDescriptorResponse: &FileDescriptorResponse{ + FileDescriptorProto: mr.FileDescriptorResponse.GetFileDescriptorProto(), + }, + } + } + case *grpc_reflection_v1alpha.ServerReflectionResponse_AllExtensionNumbersResponse: + if mr != nil { + v1.MessageResponse = &ServerReflectionResponse_AllExtensionNumbersResponse{ + AllExtensionNumbersResponse: &ExtensionNumberResponse{ + BaseTypeName: mr.AllExtensionNumbersResponse.GetBaseTypeName(), + ExtensionNumber: mr.AllExtensionNumbersResponse.GetExtensionNumber(), + }, + } + } + case *grpc_reflection_v1alpha.ServerReflectionResponse_ListServicesResponse: + if mr != nil { + svcs := make([]*ServiceResponse, len(mr.ListServicesResponse.GetService())) + for i, svc := range mr.ListServicesResponse.GetService() { + svcs[i] = &ServiceResponse{ + Name: svc.GetName(), + } + } + v1.MessageResponse = &ServerReflectionResponse_ListServicesResponse{ + ListServicesResponse: &ListServiceResponse{ + Service: svcs, + }, + } + } + case *grpc_reflection_v1alpha.ServerReflectionResponse_ErrorResponse: + if mr != nil { + v1.MessageResponse = &ServerReflectionResponse_ErrorResponse{ + ErrorResponse: &ErrorResponse{ + ErrorCode: mr.ErrorResponse.GetErrorCode(), + ErrorMessage: mr.ErrorResponse.GetErrorMessage(), + }, + } + } + default: + // no value set + } + return &v1 +} + +func ToV1AlphaResponse(v1 *ServerReflectionResponse) *grpc_reflection_v1alpha.ServerReflectionResponse { + var v1alpha grpc_reflection_v1alpha.ServerReflectionResponse + v1alpha.ValidHost = v1.ValidHost + if v1.OriginalRequest != nil { + v1alpha.OriginalRequest = ToV1AlphaRequest(v1.OriginalRequest) + } + switch mr := v1.MessageResponse.(type) { + case *ServerReflectionResponse_FileDescriptorResponse: + if mr != nil { + v1alpha.MessageResponse = &grpc_reflection_v1alpha.ServerReflectionResponse_FileDescriptorResponse{ + FileDescriptorResponse: &grpc_reflection_v1alpha.FileDescriptorResponse{ + FileDescriptorProto: mr.FileDescriptorResponse.GetFileDescriptorProto(), + }, + } + } + case *ServerReflectionResponse_AllExtensionNumbersResponse: + if mr != nil { + v1alpha.MessageResponse = &grpc_reflection_v1alpha.ServerReflectionResponse_AllExtensionNumbersResponse{ + AllExtensionNumbersResponse: &grpc_reflection_v1alpha.ExtensionNumberResponse{ + BaseTypeName: mr.AllExtensionNumbersResponse.GetBaseTypeName(), + ExtensionNumber: mr.AllExtensionNumbersResponse.GetExtensionNumber(), + }, + } + } + case *ServerReflectionResponse_ListServicesResponse: + if mr != nil { + svcs := make([]*grpc_reflection_v1alpha.ServiceResponse, len(mr.ListServicesResponse.GetService())) + for i, svc := range mr.ListServicesResponse.GetService() { + svcs[i] = &grpc_reflection_v1alpha.ServiceResponse{ + Name: svc.GetName(), + } + } + v1alpha.MessageResponse = &grpc_reflection_v1alpha.ServerReflectionResponse_ListServicesResponse{ + ListServicesResponse: &grpc_reflection_v1alpha.ListServiceResponse{ + Service: svcs, + }, + } + } + case *ServerReflectionResponse_ErrorResponse: + if mr != nil { + v1alpha.MessageResponse = &grpc_reflection_v1alpha.ServerReflectionResponse_ErrorResponse{ + ErrorResponse: &grpc_reflection_v1alpha.ErrorResponse{ + ErrorCode: mr.ErrorResponse.GetErrorCode(), + ErrorMessage: mr.ErrorResponse.GetErrorMessage(), + }, + } + } + default: + // no value set + } + return &v1alpha +} diff --git a/internal/testprotos/make_protos.sh b/internal/testprotos/make_protos.sh index 671dece8..132f7b29 100755 --- a/internal/testprotos/make_protos.sh +++ b/internal/testprotos/make_protos.sh @@ -55,4 +55,6 @@ ${PROTOC} --descriptor_set_out=./proto3_optional/desc_test_proto3_optional.proto # https://github.com/grpc/grpc-go/issues/5684 # So, for now, we generate a copy into an internal package so that our client can support that version. cd ../../grpcreflect/internal/grpc_reflection_v1 -${PROTOC} reflection.proto --go_out=Mreflection.proto=github.com/jhump/protoreflect/grpcreflect/internal/grpc_reflection_v1,paths=source_relative:. +${PROTOC} --go_out=Mreflection.proto=github.com/jhump/protoreflect/grpcreflect/internal/grpc_reflection_v1,paths=source_relative:. \ + --go-grpc_out=Mreflection.proto=github.com/jhump/protoreflect/grpcreflect/internal/grpc_reflection_v1,paths=source_relative:. \ + reflection.proto