Skip to content

Commit

Permalink
support v1 of reflection service
Browse files Browse the repository at this point in the history
  • Loading branch information
jhump committed Oct 6, 2022
1 parent 05f5a76 commit 71c2a11
Show file tree
Hide file tree
Showing 5 changed files with 645 additions and 42 deletions.
173 changes: 134 additions & 39 deletions grpcreflect/client.go
Expand Up @@ -8,17 +8,28 @@ import (
"reflect"
"runtime"
"sync"
"time"

"github.com/golang/protobuf/proto"
dpb "github.com/golang/protobuf/protoc-gen-go/descriptor"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
rpb "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
refv1alpha "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status"

"github.com/jhump/protoreflect/desc"
refv1 "github.com/jhump/protoreflect/grpcreflect/internal/grpc_reflection_v1"
"github.com/jhump/protoreflect/internal"
)

// If we try the v1 reflection API and get back "not implemented", we'll wait
// this long before trying v1 again. This allows a long-lived client to
// dynamically switch from v1alpha to v1 if the underlying server is updated
// to support it. But it also prevents every stream request from always trying
// v1 first: if we try it and see it fail, we shouldn't continually retry it
// if we expect it will fail again.
const durationBetweenV1Attempts = time.Hour

// elementNotFoundError is the error returned by reflective operations where the
// server does not recognize a given file name, symbol name, or extension.
type elementNotFoundError struct {
Expand Down Expand Up @@ -108,12 +119,16 @@ type extDesc struct {
// Client is a client connection to a server for performing reflection calls
// and resolving remote symbols.
type Client struct {
ctx context.Context
stub rpb.ServerReflectionClient
ctx context.Context
now func() time.Time
stubV1 refv1.ServerReflectionClient
stubV1Alpha refv1alpha.ServerReflectionClient

connMu sync.Mutex
cancel context.CancelFunc
stream rpb.ServerReflection_ServerReflectionInfoClient
connMu sync.Mutex
cancel context.CancelFunc
stream refv1alpha.ServerReflection_ServerReflectionInfoClient
useV1Alpha bool
lastTriedV1 time.Time

cacheMu sync.RWMutex
protosByName map[string]*dpb.FileDescriptorProto
Expand All @@ -124,10 +139,27 @@ type Client struct {

// NewClient creates a new Client with the given root context and using the
// given RPC stub for talking to the server.
func NewClient(ctx context.Context, stub rpb.ServerReflectionClient) *Client {
//
// Deprecated: Use NewClientV1Alpha if you are intentionally pinning the
// v1alpha version of the reflection service. Otherwise, use NewClientAuto
// instead.
func NewClient(ctx context.Context, stub refv1alpha.ServerReflectionClient) *Client {
return NewClientV1Alpha(ctx, stub)
}

// NewClientV1Alpha creates a new Client using the v1alpha version of reflection
// with the given root context and using the given RPC stub for talking to the
// server.
func NewClientV1Alpha(ctx context.Context, stub refv1alpha.ServerReflectionClient) *Client {
return newClient(ctx, nil, stub)
}

func newClient(ctx context.Context, stubv1 refv1.ServerReflectionClient, stubv1alpha refv1alpha.ServerReflectionClient) *Client {
cr := &Client{
ctx: ctx,
stub: stub,
now: time.Now,
stubV1: stubv1,
stubV1Alpha: stubv1alpha,
protosByName: map[string]*dpb.FileDescriptorProto{},
filesByName: map[string]*desc.FileDescriptor{},
filesBySymbol: map[string]*desc.FileDescriptor{},
Expand All @@ -138,6 +170,26 @@ func NewClient(ctx context.Context, stub rpb.ServerReflectionClient) *Client {
return cr
}

// NewClientAuto creates a new Client that will use either v1 or v1alpha version
// of reflection (based on what the server supports) with the given root context
// and using the given client connection.
//
// It will first the v1 version of the reflection service. If it gets back an
// "Unimplemented" error, it will fall back to using the v1alpha version. It
// will remember which version the server supports for any subsequent operations
// that need to re-invoke the streaming RPC. But, if it's a very long-lived
// client, it will periodically retry the v1 version (in case the server is
// updated to support it also). The period for these retries is every hour.
func NewClientAuto(ctx context.Context, cc grpc.ClientConnInterface) *Client {
stubv1 := refv1.NewServerReflectionClient(cc)
stubv1alpha := refv1alpha.NewServerReflectionClient(cc)
return newClient(ctx, stubv1, stubv1alpha)
}

// TODO: We should also have a NewClientV1. However that should not refer to internal
// generated code. So it will have to wait until the grpc-go team fixes this issue:
// https://github.com/grpc/grpc-go/issues/5684

// FileByFilename asks the server for a file descriptor for the proto file with
// the given name.
func (cr *Client) FileByFilename(filename string) (*desc.FileDescriptor, error) {
Expand All @@ -154,8 +206,8 @@ func (cr *Client) FileByFilename(filename string) (*desc.FileDescriptor, error)
return cr.descriptorFromProto(fdp)
}

req := &rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_FileByFilename{
req := &refv1alpha.ServerReflectionRequest{
MessageRequest: &refv1alpha.ServerReflectionRequest_FileByFilename{
FileByFilename: filename,
},
}
Expand All @@ -167,8 +219,8 @@ func (cr *Client) FileByFilename(filename string) (*desc.FileDescriptor, error)
if isNotFound(err) {
// file not found? see if we can look up via alternate name
if alternate, ok := internal.StdFileAliases[filename]; ok {
req := &rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_FileByFilename{
req := &refv1alpha.ServerReflectionRequest{
MessageRequest: &refv1alpha.ServerReflectionRequest_FileByFilename{
FileByFilename: alternate,
},
}
Expand Down Expand Up @@ -196,8 +248,8 @@ func (cr *Client) FileContainingSymbol(symbol string) (*desc.FileDescriptor, err
return fd, nil
}

req := &rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_FileContainingSymbol{
req := &refv1alpha.ServerReflectionRequest{
MessageRequest: &refv1alpha.ServerReflectionRequest_FileContainingSymbol{
FileContainingSymbol: symbol,
},
}
Expand Down Expand Up @@ -225,9 +277,9 @@ func (cr *Client) FileContainingExtension(extendedMessageName string, extensionN
return fd, nil
}

req := &rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_FileContainingExtension{
FileContainingExtension: &rpb.ExtensionRequest{
req := &refv1alpha.ServerReflectionRequest{
MessageRequest: &refv1alpha.ServerReflectionRequest_FileContainingExtension{
FileContainingExtension: &refv1alpha.ExtensionRequest{
ContainingType: extendedMessageName,
ExtensionNumber: extensionNumber,
},
Expand All @@ -245,7 +297,7 @@ func (cr *Client) FileContainingExtension(extendedMessageName string, extensionN
return fd, err
}

func (cr *Client) getAndCacheFileDescriptors(req *rpb.ServerReflectionRequest, expectedName, alias string, accept func(*desc.FileDescriptor) bool) (*desc.FileDescriptor, error) {
func (cr *Client) getAndCacheFileDescriptors(req *refv1alpha.ServerReflectionRequest, expectedName, alias string, accept func(*desc.FileDescriptor) bool) (*desc.FileDescriptor, error) {
resp, err := cr.send(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -379,8 +431,8 @@ func (cr *Client) cacheMessageLocked(fd *desc.FileDescriptor, md *desc.MessageDe
// AllExtensionNumbersForType asks the server for all known extension numbers
// for the given fully-qualified message name.
func (cr *Client) AllExtensionNumbersForType(extendedMessageName string) ([]int32, error) {
req := &rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_AllExtensionNumbersOfType{
req := &refv1alpha.ServerReflectionRequest{
MessageRequest: &refv1alpha.ServerReflectionRequest_AllExtensionNumbersOfType{
AllExtensionNumbersOfType: extendedMessageName,
},
}
Expand All @@ -402,8 +454,8 @@ func (cr *Client) AllExtensionNumbersForType(extendedMessageName string) ([]int3
// ListServices asks the server for the fully-qualified names of all exposed
// services.
func (cr *Client) ListServices() ([]string, error) {
req := &rpb.ServerReflectionRequest{
MessageRequest: &rpb.ServerReflectionRequest_ListServices{
req := &refv1alpha.ServerReflectionRequest{
MessageRequest: &refv1alpha.ServerReflectionRequest_ListServices{
// proto doesn't indicate any purpose for this value and server impl
// doesn't actually use it...
ListServices: "*",
Expand All @@ -425,10 +477,10 @@ func (cr *Client) ListServices() ([]string, error) {
return serviceNames, nil
}

func (cr *Client) send(req *rpb.ServerReflectionRequest) (*rpb.ServerReflectionResponse, error) {
func (cr *Client) send(req *refv1alpha.ServerReflectionRequest) (*refv1alpha.ServerReflectionResponse, error) {
// we allow one immediate retry, in case we have a stale stream
// (e.g. closed by server)
resp, err := cr.doSend(true, req)
resp, err := cr.doSend(req)
if err != nil {
return nil, err
}
Expand All @@ -450,16 +502,25 @@ func isNotFound(err error) bool {
return ok && s.Code() == codes.NotFound
}

func (cr *Client) doSend(retry bool, req *rpb.ServerReflectionRequest) (*rpb.ServerReflectionResponse, error) {
func (cr *Client) doSend(req *refv1alpha.ServerReflectionRequest) (*refv1alpha.ServerReflectionResponse, error) {
// TODO: Streams are thread-safe, so we shouldn't need to lock. But without locking, we'll need more machinery
// (goroutines and channels) to ensure that responses are correctly correlated with their requests and thus
// delivered in correct oder.
cr.connMu.Lock()
defer cr.connMu.Unlock()
return cr.doSendLocked(retry, req)
return cr.doSendLocked(0, nil, req)
}

func (cr *Client) doSendLocked(retry bool, req *rpb.ServerReflectionRequest) (*rpb.ServerReflectionResponse, error) {
func (cr *Client) doSendLocked(attemptCount int, prevErr error, req *refv1alpha.ServerReflectionRequest) (*refv1alpha.ServerReflectionResponse, error) {
if attemptCount >= 3 && prevErr != nil {
return nil, prevErr
}
if status.Code(prevErr) == codes.Unimplemented && cr.useV1() {
cr.useV1Alpha = true
cr.lastTriedV1 = cr.now()
}
attemptCount++

if err := cr.initStreamLocked(); err != nil {
return nil, err
}
Expand All @@ -470,21 +531,15 @@ func (cr *Client) doSendLocked(retry bool, req *rpb.ServerReflectionRequest) (*r
_, err = cr.stream.Recv()
}
cr.resetLocked()
if retry {
return cr.doSendLocked(false, req)
}
return nil, err
return cr.doSendLocked(attemptCount, err, req)
}

if resp, err := cr.stream.Recv(); err != nil {
resp, err := cr.stream.Recv()
if err != nil {
cr.resetLocked()
if retry {
return cr.doSendLocked(false, req)
}
return nil, err
} else {
return resp, nil
return cr.doSendLocked(attemptCount, err, req)
}
return resp, nil
}

func (cr *Client) initStreamLocked() error {
Expand All @@ -493,11 +548,34 @@ func (cr *Client) initStreamLocked() error {
}
var newCtx context.Context
newCtx, cr.cancel = context.WithCancel(cr.ctx)
if cr.useV1Alpha == true && cr.now().Sub(cr.lastTriedV1) > durationBetweenV1Attempts {
// we're due for periodic retry of v1
cr.useV1Alpha = false
}
if cr.useV1() {
// try the v1 API
streamv1, err := cr.stubV1.ServerReflectionInfo(newCtx)
if err == nil {
cr.stream = adaptStreamFromV1{streamv1}
return nil
}
if status.Code(err) != codes.Unimplemented {
return err
}
// oh well, fall through below to try v1alpha and update state
// so we skip straight to v1alpha next time
cr.useV1Alpha = true
cr.lastTriedV1 = cr.now()
}
var err error
cr.stream, err = cr.stub.ServerReflectionInfo(newCtx)
cr.stream, err = cr.stubV1Alpha.ServerReflectionInfo(newCtx)
return err
}

func (cr *Client) useV1() bool {
return !cr.useV1Alpha && cr.stubV1 != nil
}

// Reset ensures that any active stream with the server is closed, releasing any
// resources.
func (cr *Client) Reset() {
Expand Down Expand Up @@ -674,3 +752,20 @@ func (mde msgDescriptorExtensions) nestedScopes() []extensionScope {
}
return scopes
}

type adaptStreamFromV1 struct {
refv1.ServerReflection_ServerReflectionInfoClient
}

func (a adaptStreamFromV1) Send(request *refv1alpha.ServerReflectionRequest) error {
v1req := refv1.ToV1Request(request)
return a.ServerReflection_ServerReflectionInfoClient.Send(v1req)
}

func (a adaptStreamFromV1) Recv() (*refv1alpha.ServerReflectionResponse, error) {
v1resp, err := a.ServerReflection_ServerReflectionInfoClient.Recv()
if err != nil {
return nil, err
}
return refv1.ToV1AlphaResponse(v1resp), nil
}

0 comments on commit 71c2a11

Please sign in to comment.