diff --git a/changelog/pending/20221102--cli--pulumi-debug-grpc.yaml b/changelog/pending/20221102--cli--pulumi-debug-grpc.yaml new file mode 100644 index 000000000000..de1c8793c791 --- /dev/null +++ b/changelog/pending/20221102--cli--pulumi-debug-grpc.yaml @@ -0,0 +1,4 @@ +changes: +- type: feat + scope: cli + description: "Enables debug tracing of Pulumi gRPC internals: `PULUMI_DEBUG_GRPC=$PWD/grpc.json pulumi up`" diff --git a/pkg/engine/deployment.go b/pkg/engine/deployment.go index 8bb770094811..6173e53d063a 100644 --- a/pkg/engine/deployment.go +++ b/pkg/engine/deployment.go @@ -18,12 +18,15 @@ import ( "context" "errors" "fmt" + "os" "time" "github.com/opentracing/opentracing-go" + "google.golang.org/grpc" "github.com/pulumi/pulumi/pkg/v3/resource/deploy" "github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers" + interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug" "github.com/pulumi/pulumi/sdk/v3/go/common/diag" "github.com/pulumi/pulumi/sdk/v3/go/common/display" "github.com/pulumi/pulumi/sdk/v3/go/common/resource" @@ -57,6 +60,21 @@ func ProjectInfoContext(projinfo *Projinfo, host plugin.Host, return "", "", nil, err } + if logFile := os.Getenv("PULUMI_DEBUG_GRPC"); logFile != "" { + di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{ + LogFile: logFile, + Mutex: ctx.DebugTraceMutex, + }) + if err != nil { + return "", "", nil, err + } + ctx.DialOptions = func(metadata interface{}) []grpc.DialOption { + return di.DialOptions(interceptors.LogOptions{ + Metadata: metadata, + }) + } + } + // If the project wants to connect to an existing language runtime, do so now. if projinfo.Proj.Runtime.Name() == clientRuntimeName { addressValue, ok := projinfo.Proj.Runtime.Options()["address"] diff --git a/pkg/engine/plugin_host.go b/pkg/engine/plugin_host.go index 61ab5a8243a4..3b2ece612c24 100644 --- a/pkg/engine/plugin_host.go +++ b/pkg/engine/plugin_host.go @@ -1,4 +1,4 @@ -// Copyright 2016-2020, Pulumi Corporation. +// Copyright 2016-2022, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -32,10 +32,7 @@ type clientLanguageRuntimeHost struct { func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host, error) { // Dial the language runtime. - conn, err := grpc.Dial(address, grpc.WithInsecure(), - grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()), - grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()), - rpcutil.GrpcChannelOptions()) + conn, err := grpc.Dial(address, langRuntimePluginDialOptions(ctx, address)...) if err != nil { return nil, fmt.Errorf("could not connect to language host: %w", err) } @@ -50,3 +47,24 @@ func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host, func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) { return host.languageRuntime, nil } + +func langRuntimePluginDialOptions(ctx *plugin.Context, address string) []grpc.DialOption { + dialOpts := append( + rpcutil.OpenTracingInterceptorDialOptions(), + grpc.WithInsecure(), + rpcutil.GrpcChannelOptions(), + ) + + if ctx.DialOptions != nil { + metadata := map[string]interface{}{ + "mode": "client", + "kind": "language", + } + if address != "" { + metadata["address"] = address + } + dialOpts = append(dialOpts, ctx.DialOptions(metadata)...) + } + + return dialOpts +} diff --git a/pkg/resource/deploy/deploytest/pluginhost.go b/pkg/resource/deploy/deploytest/pluginhost.go index 5b1f6d519f8e..bb02759d0109 100644 --- a/pkg/resource/deploy/deploytest/pluginhost.go +++ b/pkg/resource/deploy/deploytest/pluginhost.go @@ -156,17 +156,19 @@ func (w *grpcWrapper) Close() error { func wrapProviderWithGrpc(provider plugin.Provider) (plugin.Provider, io.Closer, error) { wrapper := &grpcWrapper{stop: make(chan bool)} - port, _, err := rpcutil.Serve(0, wrapper.stop, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: wrapper.stop, + Init: func(srv *grpc.Server) error { pulumirpc.RegisterResourceProviderServer(srv, plugin.NewProviderServer(provider)) return nil }, - }, nil) + Options: rpcutil.OpenTracingServerInterceptorOptions(nil), + }) if err != nil { return nil, nil, fmt.Errorf("could not start resource provider service: %w", err) } conn, err := grpc.Dial( - fmt.Sprintf("127.0.0.1:%v", port), + fmt.Sprintf("127.0.0.1:%v", handle.Port), grpc.WithInsecure(), grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()), grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()), @@ -242,16 +244,18 @@ func NewPluginHost(sink, statusSink diag.Sink, languageRuntime plugin.LanguageRu statusSink: statusSink, stop: make(chan bool), } - port, _, err := rpcutil.Serve(0, engine.stop, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: engine.stop, + Init: func(srv *grpc.Server) error { pulumirpc.RegisterEngineServer(srv, engine) return nil }, - }, nil) + Options: rpcutil.OpenTracingServerInterceptorOptions(nil), + }) if err != nil { panic(fmt.Errorf("could not start engine service: %v", err)) } - engine.address = fmt.Sprintf("127.0.0.1:%v", port) + engine.address = fmt.Sprintf("127.0.0.1:%v", handle.Port) return &pluginHost{ pluginLoaders: pluginLoaders, diff --git a/pkg/resource/deploy/source_eval.go b/pkg/resource/deploy/source_eval.go index 00b70e9e23a6..bcdad1fd5fc6 100644 --- a/pkg/resource/deploy/source_eval.go +++ b/pkg/resource/deploy/source_eval.go @@ -19,6 +19,7 @@ import ( "encoding/json" "errors" "fmt" + "os" "strings" "sync" "time" @@ -32,6 +33,7 @@ import ( "google.golang.org/grpc/codes" "github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers" + interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug" "github.com/pulumi/pulumi/sdk/v3/go/common/diag" "github.com/pulumi/pulumi/sdk/v3/go/common/resource" "github.com/pulumi/pulumi/sdk/v3/go/common/resource/config" @@ -488,7 +490,7 @@ type resmon struct { regOutChan chan *registerResourceOutputsEvent // the channel to send resource output registrations to. regReadChan chan *readResourceEvent // the channel to send resource reads to. cancel chan bool // a channel that can cancel the server. - done chan error // a channel that resolves when the server completes. + done <-chan error // a channel that resolves when the server completes. disableResourceReferences bool // true if resource references are disabled. disableOutputValues bool // true if output values are disabled. } @@ -528,12 +530,14 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg } // Fire up a gRPC server and start listening for incomings. - port, done, err := rpcutil.Serve(0, resmon.cancel, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: resmon.cancel, + Init: func(srv *grpc.Server) error { pulumirpc.RegisterResourceMonitorServer(srv, resmon) return nil }, - }, tracingSpan, otgrpc.SpanDecorator(decorateResourceSpans)) + Options: sourceEvalServeOptions(src.plugctx, tracingSpan), + }) if err != nil { return nil, err } @@ -545,9 +549,9 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg ConfigSecretKeys: configSecretKeys, DryRun: src.dryRun, Parallel: opts.Parallel, - MonitorAddress: fmt.Sprintf("127.0.0.1:%d", port), + MonitorAddress: fmt.Sprintf("127.0.0.1:%d", handle.Port), } - resmon.done = done + resmon.done = handle.Done go d.serve() @@ -565,6 +569,30 @@ func (rm *resmon) Cancel() error { return <-rm.done } +func sourceEvalServeOptions(ctx *plugin.Context, tracingSpan opentracing.Span) []grpc.ServerOption { + serveOpts := rpcutil.OpenTracingServerInterceptorOptions( + tracingSpan, + otgrpc.SpanDecorator(decorateResourceSpans), + ) + if logFile := os.Getenv("PULUMI_DEBUG_GRPC"); logFile != "" { + di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{ + LogFile: logFile, + Mutex: ctx.DebugTraceMutex, + }) + if err != nil { + // ignoring + return nil + } + metadata := map[string]interface{}{ + "mode": "server", + } + serveOpts = append(serveOpts, di.ServerOptions(interceptors.LogOptions{ + Metadata: metadata, + })...) + } + return serveOpts +} + // getProviderReference fetches the provider reference for a resource, read, or invoke from the given package with the // given unparsed provider reference. If the unparsed provider reference is empty, this function returns a reference // to the default provider for the indicated package. diff --git a/pkg/resource/deploy/source_query.go b/pkg/resource/deploy/source_query.go index ba81c3aaaf16..e708132b4638 100644 --- a/pkg/resource/deploy/source_query.go +++ b/pkg/resource/deploy/source_query.go @@ -249,17 +249,19 @@ func newQueryResourceMonitor( } // Fire up a gRPC server and start listening for incomings. - port, done, err := rpcutil.Serve(0, queryResmon.cancel, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: queryResmon.cancel, + Init: func(srv *grpc.Server) error { pulumirpc.RegisterResourceMonitorServer(srv, queryResmon) return nil }, - }, tracingSpan) + Options: rpcutil.OpenTracingServerInterceptorOptions(tracingSpan), + }) if err != nil { return nil, err } - monitorAddress := fmt.Sprintf("127.0.0.1:%d", port) + monitorAddress := fmt.Sprintf("127.0.0.1:%d", handle.Port) var config map[config.Key]string if runinfo.Target != nil { @@ -283,7 +285,7 @@ func newQueryResourceMonitor( MonitorAddress: monitorAddress, } queryResmon.addr = monitorAddress - queryResmon.done = done + queryResmon.done = handle.Done go d.serve() @@ -303,7 +305,7 @@ type queryResmon struct { defaultProviders *defaultProviders // the default provider manager. addr string // the address the host is listening on. cancel chan bool // a channel that can cancel the server. - done chan error // a channel that resolves when the server completes. + done <-chan error // a channel that resolves when the server completes. reg *providers.Registry // registry for resource providers. callInfo plugin.CallInfo // information for call calls. } diff --git a/pkg/resource/provider/main.go b/pkg/resource/provider/main.go index 5588f8a5cb43..9ec4d4ca47bd 100644 --- a/pkg/resource/provider/main.go +++ b/pkg/resource/provider/main.go @@ -73,8 +73,9 @@ func Main(name string, provMaker func(*HostClient) (pulumirpc.ResourceProviderSe } // Fire up a gRPC server, letting the kernel choose a free port for us. - port, done, err := rpcutil.Serve(0, cancelChannel, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: cancelChannel, + Init: func(srv *grpc.Server) error { prov, proverr := provMaker(host) if proverr != nil { return fmt.Errorf("failed to create resource provider: %v", proverr) @@ -82,16 +83,17 @@ func Main(name string, provMaker func(*HostClient) (pulumirpc.ResourceProviderSe pulumirpc.RegisterResourceProviderServer(srv, prov) return nil }, - }, nil) + Options: rpcutil.OpenTracingServerInterceptorOptions(nil), + }) if err != nil { return fmt.Errorf("fatal: %v", err) } // The resource provider protocol requires that we now write out the port we have chosen to listen on. - fmt.Printf("%d\n", port) + fmt.Printf("%d\n", handle.Port) // Finally, wait for the server to stop serving. - if err := <-done; err != nil { + if err := <-handle.Done; err != nil { return fmt.Errorf("fatal: %v", err) } diff --git a/pkg/util/rpcdebug/interceptors.go b/pkg/util/rpcdebug/interceptors.go new file mode 100644 index 000000000000..e96c3e835844 --- /dev/null +++ b/pkg/util/rpcdebug/interceptors.go @@ -0,0 +1,382 @@ +// Copyright 2016-2022, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpcdebug + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "os" + "reflect" + "sync" + + "google.golang.org/grpc" + + "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" + "google.golang.org/grpc/metadata" +) + +type DebugInterceptor struct { + logFile string + mutex *sync.Mutex +} + +type DebugInterceptorOptions struct { + LogFile string + Mutex *sync.Mutex +} + +type LogOptions struct { + Metadata interface{} +} + +// Each LogFile should have a unique instance of DebugInterceptor for proper locking. +func NewDebugInterceptor(opts DebugInterceptorOptions) (*DebugInterceptor, error) { + if opts.LogFile == "" { + return nil, fmt.Errorf("logFile cannot be empty") + } + i := &DebugInterceptor{logFile: opts.LogFile} + + if opts.Mutex != nil { + i.mutex = opts.Mutex + } else { + i.mutex = &sync.Mutex{} + } + + return i, nil +} + +func (i *DebugInterceptor) ServerOptions(opts LogOptions) []grpc.ServerOption { + return []grpc.ServerOption{ + grpc.ChainUnaryInterceptor(i.DebugServerInterceptor(opts)), + grpc.ChainStreamInterceptor(i.DebugStreamServerInterceptor(opts)), + } +} + +func (i *DebugInterceptor) DialOptions(opts LogOptions) []grpc.DialOption { + return []grpc.DialOption{ + grpc.WithChainUnaryInterceptor(i.DebugClientInterceptor(opts)), + grpc.WithChainStreamInterceptor(i.DebugStreamClientInterceptor(opts)), + } +} + +// Logs all gRPC converations in JSON format. +// +// To enable, call InitDebugInterceptors first in your process main to +// configure the location of the Go file. +func (i *DebugInterceptor) DebugServerInterceptor(opts LogOptions) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, + info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + log := debugInterceptorLogEntry{ + Method: info.FullMethod, + Metadata: opts.Metadata, + } + i.trackRequest(&log, req) + resp, err := handler(ctx, req) + i.trackResponse(&log, resp) + if e := i.record(log); e != nil { + return resp, e + } + return resp, err + } +} + +// Like debugServerInterceptor but for streaming calls. +func (i *DebugInterceptor) DebugStreamServerInterceptor(opts LogOptions) grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + ssWrapped := &debugServerStream{ + interceptor: i, + method: info.FullMethod, + innerServerStream: ss, + metadata: opts.Metadata, + } + err := handler(srv, ssWrapped) + return err + } +} + +// Like debugServerInterceptor but for GRPC client connections. +func (i *DebugInterceptor) DebugClientInterceptor(opts LogOptions) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, + cc *grpc.ClientConn, invoker grpc.UnaryInvoker, gopts ...grpc.CallOption) error { + // Ignoring weird entries with empty method and nil req and reply. + if method == "" { + return invoker(ctx, method, req, reply, cc, gopts...) + } + + log := debugInterceptorLogEntry{ + Method: method, + Metadata: opts.Metadata, + } + i.trackRequest(&log, req) + err := invoker(ctx, method, req, reply, cc, gopts...) + i.trackResponse(&log, reply) + if e := i.record(log); e != nil { + return e + } + return err + } +} + +// Like debugClientInterceptor but for streaming calls. +func (i *DebugInterceptor) DebugStreamClientInterceptor(opts LogOptions) grpc.StreamClientInterceptor { + return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, + streamer grpc.Streamer, gopts ...grpc.CallOption) (grpc.ClientStream, error) { + + stream, err := streamer(ctx, desc, cc, method, gopts...) + + wrappedStream := &debugClientStream{ + innerClientStream: stream, + interceptor: i, + method: method, + metadata: opts.Metadata, + } + + return wrappedStream, err + } +} + +func (i *DebugInterceptor) record(log debugInterceptorLogEntry) error { + i.mutex.Lock() + defer i.mutex.Unlock() + + f, err := os.OpenFile(i.logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) + if err != nil { + return fmt.Errorf("Failed to append GRPC debug logs to file %s: %v", i.logFile, err) + } + defer f.Close() + + if err := json.NewEncoder(f).Encode(log); err != nil { + return fmt.Errorf("Failed to encode GRPC debug logs: %v", err) + } + return nil +} + +func (*DebugInterceptor) track(log *debugInterceptorLogEntry, err error) { + log.Errors = append(log.Errors, err.Error()) +} + +func (i *DebugInterceptor) trackRequest(log *debugInterceptorLogEntry, req interface{}) { + j, err := i.transcode(req) + if err != nil { + i.track(log, err) + } else { + log.Request = j + } +} + +func (i *DebugInterceptor) trackResponse(log *debugInterceptorLogEntry, resp interface{}) { + j, err := i.transcode(resp) + if err != nil { + i.track(log, err) + } else { + log.Response = j + } +} + +func (*DebugInterceptor) transcode(obj interface{}) (json.RawMessage, error) { + if obj == nil { + return json.RawMessage("null"), nil + } + + m, ok := obj.(proto.Message) + if !ok { + return json.RawMessage("null"), + fmt.Errorf("Failed to decode, expecting proto.Message, got %v", + reflect.TypeOf(obj)) + } + + jsonSer := jsonpb.Marshaler{} + buf := bytes.Buffer{} + if err := jsonSer.Marshal(&buf, m); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +// Wraps grpc.ServerStream with interceptor hooks for SendMsg, RecvMsg. +type debugServerStream struct { + innerServerStream grpc.ServerStream + interceptor *DebugInterceptor + method string + metadata interface{} +} + +func (dss *debugServerStream) errorEntry(err error) debugInterceptorLogEntry { + return debugInterceptorLogEntry{ + Metadata: dss.metadata, + Method: dss.method, + Errors: []string{err.Error()}, + } +} + +func (dss *debugServerStream) SetHeader(md metadata.MD) error { + return dss.innerServerStream.SetHeader(md) +} + +func (dss *debugServerStream) SendHeader(md metadata.MD) error { + return dss.innerServerStream.SendHeader(md) +} + +func (dss *debugServerStream) SetTrailer(md metadata.MD) { + dss.innerServerStream.SetTrailer(md) +} + +func (dss *debugServerStream) Context() context.Context { + return dss.innerServerStream.Context() +} + +func (dss *debugServerStream) SendMsg(m interface{}) error { + err := dss.innerServerStream.SendMsg(m) + if err != nil { + if e := dss.interceptor.record(dss.errorEntry(err)); e != nil { + return e + } + } else { + req, err := dss.interceptor.transcode(m) + if err != nil { + if e := dss.interceptor.record(dss.errorEntry(err)); e != nil { + return e + } + } else { + if e := dss.interceptor.record(debugInterceptorLogEntry{ + Metadata: dss.metadata, + Method: dss.method, + Request: req, + }); e != nil { + return e + } + } + } + return err +} + +func (dss *debugServerStream) RecvMsg(m interface{}) error { + err := dss.innerServerStream.RecvMsg(m) + if err == io.EOF { + return err + } else if err != nil { + if e := dss.interceptor.record(dss.errorEntry(err)); e != nil { + return e + } + } else { + resp, err := dss.interceptor.transcode(m) + if err != nil { + if e := dss.interceptor.record(dss.errorEntry(err)); e != nil { + return e + } + } else { + if e := dss.interceptor.record(debugInterceptorLogEntry{ + Method: dss.method, + Metadata: dss.metadata, + Response: resp, + }); e != nil { + return e + } + } + } + return err +} + +var _ grpc.ServerStream = &debugServerStream{} + +// Wraps grpc.ClientStream with interceptor hooks for SendMsg, RecvMsg. +type debugClientStream struct { + innerClientStream grpc.ClientStream + interceptor *DebugInterceptor + method string + metadata interface{} +} + +func (d *debugClientStream) errorEntry(err error) debugInterceptorLogEntry { + return debugInterceptorLogEntry{ + Method: d.method, + Metadata: d.metadata, + Errors: []string{err.Error()}, + } +} + +func (d *debugClientStream) Header() (metadata.MD, error) { + return d.innerClientStream.Header() +} + +func (d *debugClientStream) Trailer() metadata.MD { + return d.innerClientStream.Trailer() +} + +func (d *debugClientStream) CloseSend() error { + return d.innerClientStream.CloseSend() +} + +func (d *debugClientStream) Context() context.Context { + return d.innerClientStream.Context() +} + +func (d *debugClientStream) SendMsg(m interface{}) error { + err := d.innerClientStream.SendMsg(m) + if err != nil { + if e := d.interceptor.record(d.errorEntry(err)); e != nil { + return e + } + } else { + req, err := d.interceptor.transcode(m) + if err != nil { + if e := d.interceptor.record(d.errorEntry(err)); e != nil { + return e + } + } else { + if e := d.interceptor.record(debugInterceptorLogEntry{ + Method: d.method, + Metadata: d.metadata, + Request: req, + }); e != nil { + return e + } + } + } + return err +} + +func (d *debugClientStream) RecvMsg(m interface{}) error { + err := d.innerClientStream.RecvMsg(m) + if err == io.EOF { + return err + } else if err != nil { + if e := d.interceptor.record(d.errorEntry(err)); e != nil { + return e + } + } else { + resp, err := d.interceptor.transcode(m) + if err != nil { + if e := d.interceptor.record(d.errorEntry(err)); e != nil { + return e + } + } else { + if e := d.interceptor.record(debugInterceptorLogEntry{ + Method: d.method, + Metadata: d.metadata, + Response: resp, + }); e != nil { + return e + } + } + } + return err +} + +var _ grpc.ClientStream = &debugClientStream{} diff --git a/pkg/util/rpcdebug/logformat.go b/pkg/util/rpcdebug/logformat.go new file mode 100644 index 000000000000..e4fb500c0f73 --- /dev/null +++ b/pkg/util/rpcdebug/logformat.go @@ -0,0 +1,30 @@ +// Copyright 2016-2022, Pulumi Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rpcdebug + +import ( + "encoding/json" +) + +// JSON format for tracking gRPC conversations. Normal methods have +// one entry for each req-resp conversation, streaming methods have +// one entry per each request or response over the stream. +type debugInterceptorLogEntry struct { + Method string `json:"method"` + Request json.RawMessage `json:"request,omitempty"` + Response json.RawMessage `json:"response,omitempty"` + Errors []string `json:"errors,omitempty"` + Metadata interface{} `json:"metadata,omitempty"` +} diff --git a/sdk/dotnet/cmd/pulumi-language-dotnet/main.go b/sdk/dotnet/cmd/pulumi-language-dotnet/main.go index 0eba102f29d5..8ceab51d034f 100644 --- a/sdk/dotnet/cmd/pulumi-language-dotnet/main.go +++ b/sdk/dotnet/cmd/pulumi-language-dotnet/main.go @@ -106,22 +106,24 @@ func main() { } // Fire up a gRPC server, letting the kernel choose a free port. - port, done, err := rpcutil.Serve(0, cancelChannel, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: cancelChannel, + Init: func(srv *grpc.Server) error { host := newLanguageHost(dotnetExec, engineAddress, tracing, binary) pulumirpc.RegisterLanguageRuntimeServer(srv, host) return nil }, - }, nil) + Options: rpcutil.OpenTracingServerInterceptorOptions(nil), + }) if err != nil { cmdutil.Exit(errors.Wrapf(err, "could not start language host RPC server")) } // Otherwise, print out the port so that the spawner knows how to reach us. - fmt.Printf("%d\n", port) + fmt.Printf("%d\n", handle.Port) // And finally wait for the server to stop serving. - if err := <-done; err != nil { + if err := <-handle.Done; err != nil { cmdutil.Exit(errors.Wrapf(err, "language host RPC stopped serving")) } } diff --git a/sdk/go/auto/remote_workspace_test.go b/sdk/go/auto/remote_workspace_test.go index 0929828edb88..55ee3192ba54 100644 --- a/sdk/go/auto/remote_workspace_test.go +++ b/sdk/go/auto/remote_workspace_test.go @@ -23,23 +23,24 @@ import ( func TestIsFullyQualifiedStackName(t *testing.T) { t.Parallel() - tests := map[string]struct { + tests := []struct { + name string input string expected bool }{ - "fully qualified": {input: "owner/project/stack", expected: true}, - "empty": {input: "", expected: false}, - "name": {input: "name", expected: false}, - "name & owner": {input: "owner/name", expected: false}, - "sep": {input: "/", expected: false}, - "two seps": {input: "//", expected: false}, - "three seps": {input: "///", expected: false}, - "invalid": {input: "owner/project/stack/wat", expected: false}, + {name: "fully qualified", input: "owner/project/stack", expected: true}, + {name: "empty", input: "", expected: false}, + {name: "name", input: "name", expected: false}, + {name: "name & owner", input: "owner/name", expected: false}, + {name: "sep", input: "/", expected: false}, + {name: "two seps", input: "//", expected: false}, + {name: "three seps", input: "///", expected: false}, + {name: "invalid", input: "owner/project/stack/wat", expected: false}, } - for name, tc := range tests { + for _, tc := range tests { tc := tc - t.Run(name, func(t *testing.T) { + t.Run(tc.name, func(t *testing.T) { t.Parallel() actual := isFullyQualifiedStackName(tc.input) diff --git a/sdk/go/auto/stack.go b/sdk/go/auto/stack.go index db9f951e3087..d8d758d5f987 100644 --- a/sdk/go/auto/stack.go +++ b/sdk/go/auto/stack.go @@ -1033,7 +1033,7 @@ type languageRuntimeServer struct { state int cancel chan bool - done chan error + done <-chan error } // isNestedInvocation returns true if pulumi.RunWithContext is on the stack. @@ -1066,16 +1066,18 @@ func startLanguageRuntimeServer(fn pulumi.RunFunc) (*languageRuntimeServer, erro } s.c = sync.NewCond(&s.m) - port, done, err := rpcutil.Serve(0, s.cancel, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: s.cancel, + Init: func(srv *grpc.Server) error { pulumirpc.RegisterLanguageRuntimeServer(srv, s) return nil }, - }, nil) + Options: rpcutil.OpenTracingServerInterceptorOptions(nil), + }) if err != nil { return nil, err } - s.address, s.done = fmt.Sprintf("127.0.0.1:%d", port), done + s.address, s.done = fmt.Sprintf("127.0.0.1:%d", handle.Port), handle.Done return s, nil } diff --git a/sdk/go/common/resource/plugin/analyzer_plugin.go b/sdk/go/common/resource/plugin/analyzer_plugin.go index 26d7409feb4f..b2eab3c6e3d3 100644 --- a/sdk/go/common/resource/plugin/analyzer_plugin.go +++ b/sdk/go/common/resource/plugin/analyzer_plugin.go @@ -27,6 +27,7 @@ import ( pbempty "github.com/golang/protobuf/ptypes/empty" structpb "github.com/golang/protobuf/ptypes/struct" "github.com/pkg/errors" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "github.com/pulumi/pulumi/sdk/v3/go/common/apitype" @@ -34,6 +35,7 @@ import ( "github.com/pulumi/pulumi/sdk/v3/go/common/tokens" "github.com/pulumi/pulumi/sdk/v3/go/common/util/contract" "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" + "github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror" "github.com/pulumi/pulumi/sdk/v3/go/common/workspace" pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go" @@ -62,8 +64,10 @@ func NewAnalyzer(host Host, ctx *Context, name tokens.QName) (Analyzer, error) { } contract.Assert(path != "") + dialOpts := rpcutil.OpenTracingInterceptorDialOptions() + plug, err := newPlugin(ctx, ctx.Pwd, path, fmt.Sprintf("%v (analyzer)", name), - []string{host.ServerAddr(), ctx.Pwd}, nil /*env*/) + []string{host.ServerAddr(), ctx.Pwd}, nil /*env*/, dialOpts) if err != nil { return nil, err } @@ -126,7 +130,9 @@ func NewPolicyAnalyzer( } } - plug, err := newPlugin(ctx, pwd, pluginPath, fmt.Sprintf("%v (analyzer)", name), args, env) + plug, err := newPlugin(ctx, pwd, pluginPath, fmt.Sprintf("%v (analyzer)", name), args, env, + analyzerPluginDialOptions(ctx, fmt.Sprintf("%v", name))) + if err != nil { // The original error might have been wrapped before being returned from newPlugin. So we look for // the root cause of the error. This won't work if we switch to Go 1.13's new approach to wrapping. @@ -414,6 +420,27 @@ func (a *analyzer) Close() error { return a.plug.Close() } +func analyzerPluginDialOptions(ctx *Context, name string) []grpc.DialOption { + dialOpts := append( + rpcutil.OpenTracingInterceptorDialOptions(), + grpc.WithInsecure(), + rpcutil.GrpcChannelOptions(), + ) + + if ctx.DialOptions != nil { + metadata := map[string]interface{}{ + "mode": "client", + "kind": "analyzer", + } + if name != "" { + metadata["name"] = name + } + dialOpts = append(dialOpts, ctx.DialOptions(metadata)...) + } + + return dialOpts +} + func marshalResourceOptions(opts AnalyzerResourceOptions) *pulumirpc.AnalyzerResourceOptions { secs := make([]string, len(opts.AdditionalSecretOutputs)) for idx := range opts.AdditionalSecretOutputs { diff --git a/sdk/go/common/resource/plugin/context.go b/sdk/go/common/resource/plugin/context.go index 7f5d671183af..472bfaec3493 100644 --- a/sdk/go/common/resource/plugin/context.go +++ b/sdk/go/common/resource/plugin/context.go @@ -17,8 +17,10 @@ package plugin import ( "context" "io" + "sync" "github.com/opentracing/opentracing-go" + "google.golang.org/grpc" "github.com/pulumi/pulumi/sdk/v3/go/common/diag" "github.com/pulumi/pulumi/sdk/v3/go/common/diag/colors" @@ -36,6 +38,12 @@ type Context struct { Pwd string // the working directory to spawn all plugins in. Root string // the root directory of the project. + // If non-nil, configures custom gRPC client options. Receives pluginInfo which is a JSON-serializable bit of + // metadata describing the plugin. + DialOptions func(pluginInfo interface{}) []grpc.DialOption + + DebugTraceMutex *sync.Mutex // used internally to syncronize debug tracing + tracingSpan opentracing.Span // the OpenTracing span to parent requests within. cancelFuncs []context.CancelFunc @@ -80,11 +88,12 @@ func NewContextWithRoot(d, statusD diag.Sink, host Host, } ctx := &Context{ - Diag: d, - StatusDiag: statusD, - Host: host, - Pwd: pwd, - tracingSpan: parentSpan, + Diag: d, + StatusDiag: statusD, + Host: host, + Pwd: pwd, + tracingSpan: parentSpan, + DebugTraceMutex: &sync.Mutex{}, } if host == nil { h, err := NewDefaultHost(ctx, runtimeOptions, disableProviderPreview, plugins) diff --git a/sdk/go/common/resource/plugin/host_server.go b/sdk/go/common/resource/plugin/host_server.go index b2b32bf783c9..c9ee2c7e1ce9 100644 --- a/sdk/go/common/resource/plugin/host_server.go +++ b/sdk/go/common/resource/plugin/host_server.go @@ -31,11 +31,11 @@ import ( // hostServer is the server side of the host RPC machinery. type hostServer struct { - host Host // the host for this RPC server. - ctx *Context // the associated plugin context. - addr string // the address the host is listening on. - cancel chan bool // a channel that can cancel the server. - done chan error // a channel that resolves when the server completes. + host Host // the host for this RPC server. + ctx *Context // the associated plugin context. + addr string // the address the host is listening on. + cancel chan bool // a channel that can cancel the server. + done <-chan error // a channel that resolves when the server completes. // hostServer contains little bits of state that can't be saved in the language host. rootUrn atomic.Value // a root resource URN that has been saved via SetRootResource @@ -51,16 +51,22 @@ func newHostServer(host Host, ctx *Context) (*hostServer, error) { } // Fire up a gRPC server and start listening for incomings. - port, done, err := rpcutil.Serve(0, engine.cancel, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: engine.cancel, + Init: func(srv *grpc.Server) error { lumirpc.RegisterEngineServer(srv, engine) return nil }, - }, ctx.tracingSpan) + Options: rpcutil.OpenTracingServerInterceptorOptions(ctx.tracingSpan), + }) + if err != nil { return nil, err } + port := handle.Port + done := handle.Done + engine.addr = fmt.Sprintf("127.0.0.1:%d", port) engine.done = done engine.rootUrn.Store("") diff --git a/sdk/go/common/resource/plugin/langruntime_plugin.go b/sdk/go/common/resource/plugin/langruntime_plugin.go index dab408047993..12d2e4cadcfb 100644 --- a/sdk/go/common/resource/plugin/langruntime_plugin.go +++ b/sdk/go/common/resource/plugin/langruntime_plugin.go @@ -24,6 +24,7 @@ import ( "github.com/blang/semver" pbempty "github.com/golang/protobuf/ptypes/empty" "github.com/pkg/errors" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "github.com/pulumi/pulumi/sdk/v3/go/common/diag/colors" @@ -31,6 +32,7 @@ import ( "github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/contract" "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" + "github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror" "github.com/pulumi/pulumi/sdk/v3/go/common/workspace" pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go" @@ -62,7 +64,7 @@ func NewLanguageRuntime(host Host, ctx *Context, runtime string, return nil, err } - plug, err := newPlugin(ctx, ctx.Pwd, path, runtime, args, nil /*env*/) + plug, err := newPlugin(ctx, ctx.Pwd, path, runtime, args, nil /*env*/, langRuntimePluginDialOptions(ctx, runtime)) if err != nil { return nil, err } @@ -76,6 +78,27 @@ func NewLanguageRuntime(host Host, ctx *Context, runtime string, }, nil } +func langRuntimePluginDialOptions(ctx *Context, runtime string) []grpc.DialOption { + dialOpts := append( + rpcutil.OpenTracingInterceptorDialOptions(), + grpc.WithInsecure(), + rpcutil.GrpcChannelOptions(), + ) + + if ctx.DialOptions != nil { + metadata := map[string]interface{}{ + "mode": "client", + "kind": "language", + } + if runtime != "" { + metadata["runtime"] = runtime + } + dialOpts = append(dialOpts, ctx.DialOptions(metadata)...) + } + + return dialOpts +} + func buildArgsForNewPlugin(host Host, ctx *Context, options map[string]interface{}) ([]string, error) { root, err := filepath.Abs(ctx.Root) if err != nil { diff --git a/sdk/go/common/resource/plugin/plugin.go b/sdk/go/common/resource/plugin/plugin.go index 8ce4e608acf6..41d887e5ca80 100644 --- a/sdk/go/common/resource/plugin/plugin.go +++ b/sdk/go/common/resource/plugin/plugin.go @@ -1,4 +1,4 @@ -// Copyright 2016-2021, Pulumi Corporation. +// Copyright 2016-2022, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package plugin import ( "bufio" "encoding/json" + "fmt" "io" "io/ioutil" "os" @@ -27,7 +28,6 @@ import ( "syscall" "time" - "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" multierror "github.com/hashicorp/go-multierror" "github.com/pkg/errors" "golang.org/x/net/context" @@ -40,7 +40,6 @@ import ( "github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/contract" "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" - "github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil" ) // PulumiPluginJSON represents additional information about a package's associated Pulumi plugin. @@ -115,15 +114,11 @@ var errRunPolicyModuleNotFound = errors.New("pulumi SDK does not support policy // errPluginNotFound is returned when we try to execute a plugin but it is not found on disk. var errPluginNotFound = errors.New("plugin not found") -func dialPlugin(port, bin, prefix string) (*grpc.ClientConn, error) { +func dialPlugin(portNum int, bin, prefix string, dialOptions []grpc.DialOption) (*grpc.ClientConn, error) { + port := fmt.Sprintf("%d", portNum) + // Now that we have the port, go ahead and create a gRPC client connection to it. - conn, err := grpc.Dial( - "127.0.0.1:"+port, - grpc.WithInsecure(), - grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()), - grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()), - rpcutil.GrpcChannelOptions(), - ) + conn, err := grpc.Dial("127.0.0.1:"+port, dialOptions...) if err != nil { return nil, errors.Wrapf(err, "could not dial plugin [%v] over RPC", bin) } @@ -173,7 +168,8 @@ func dialPlugin(port, bin, prefix string) (*grpc.ClientConn, error) { return conn, nil } -func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, options ...otgrpc.Option) (*plugin, error) { +func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, + dialOptions []grpc.DialOption) (*plugin, error) { if logging.V(9) { var argstr string for i, arg := range args { @@ -245,7 +241,7 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, option // Now that we have a process, we expect it to write a single line to STDOUT: the port it's listening on. We only // read a byte at a time so that STDOUT contains everything after the first newline. - var port string + var portString string b := make([]byte, 1) for { n, readerr := plug.Stdout.Read(b) @@ -259,19 +255,21 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, option } // Fall back to a generic, opaque error. - if port == "" { + if portString == "" { return nil, errors.Wrapf(readerr, "could not read plugin [%v] stdout", bin) } - return nil, errors.Wrapf(readerr, "failure reading plugin [%v] stdout (read '%v')", bin, port) + return nil, errors.Wrapf(readerr, "failure reading plugin [%v] stdout (read '%v')", + bin, portString) } if n > 0 && b[0] == '\n' { break } - port += string(b[:n]) + portString += string(b[:n]) } // Parse the output line (minus the '\n') to ensure it's a numeric port. - if _, err = strconv.Atoi(port); err != nil { + var port int + if port, err = strconv.Atoi(portString); err != nil { killerr := plug.Kill() contract.IgnoreError(killerr) // ignoring the error because the existing one trumps it. return nil, errors.Wrapf( @@ -283,7 +281,7 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, option plug.stdoutDone = stdoutDone go runtrace(plug.Stdout, false, stdoutDone) - conn, err := dialPlugin(port, bin, prefix) + conn, err := dialPlugin(port, bin, prefix, dialOptions) if err != nil { return nil, err } diff --git a/sdk/go/common/resource/plugin/provider_plugin.go b/sdk/go/common/resource/plugin/provider_plugin.go index 3f67cca0aa75..8c5ce5095086 100644 --- a/sdk/go/common/resource/plugin/provider_plugin.go +++ b/sdk/go/common/resource/plugin/provider_plugin.go @@ -20,6 +20,7 @@ import ( "fmt" "io" "os" + "strconv" "strings" "github.com/blang/semver" @@ -29,6 +30,7 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/opentracing/opentracing-go" "github.com/pkg/errors" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "github.com/pulumi/pulumi/sdk/v3/go/common/resource" @@ -36,6 +38,7 @@ import ( "github.com/pulumi/pulumi/sdk/v3/go/common/util/cmdutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/contract" "github.com/pulumi/pulumi/sdk/v3/go/common/util/logging" + "github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil" "github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil/rpcerror" "github.com/pulumi/pulumi/sdk/v3/go/common/workspace" pulumirpc "github.com/pulumi/pulumi/sdk/v3/proto/go" @@ -94,7 +97,13 @@ func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Ve prefix := fmt.Sprintf("%v (resource)", pkg) if optAttach != "" { - conn, err := dialPlugin(optAttach, pkg.String(), prefix) + port, err := strconv.Atoi(optAttach) + if err != nil { + return nil, fmt.Errorf("Expected a numeric port, got %s in PULUMI_DEBUG_PROVIDERS: %w", + optAttach, err) + } + + conn, err := dialPlugin(port, pkg.String(), prefix, providerPluginDialOptions(ctx, pkg, "")) if err != nil { return nil, err } @@ -123,7 +132,7 @@ func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Ve } plug, err = newPlugin(ctx, ctx.Pwd, path, prefix, - []string{host.ServerAddr()}, env, otgrpc.SpanDecorator(decorateProviderSpans)) + []string{host.ServerAddr()}, env, providerPluginDialOptions(ctx, pkg, "")) if err != nil { return nil, err } @@ -154,11 +163,36 @@ func NewProvider(host Host, ctx *Context, pkg tokens.Package, version *semver.Ve return p, nil } +func providerPluginDialOptions(ctx *Context, pkg tokens.Package, path string) []grpc.DialOption { + dialOpts := append( + rpcutil.OpenTracingInterceptorDialOptions(otgrpc.SpanDecorator(decorateProviderSpans)), + grpc.WithInsecure(), + rpcutil.GrpcChannelOptions(), + ) + + if ctx.DialOptions != nil { + metadata := map[string]interface{}{ + "mode": "client", + "kind": "resource", + } + if pkg != "" { + metadata["name"] = pkg.String() + } + if path != "" { + metadata["path"] = path + } + dialOpts = append(dialOpts, ctx.DialOptions(metadata)...) + } + + return dialOpts +} + // NewProviderFromPath creates a new provider by loading the plugin binary located at `path`. func NewProviderFromPath(host Host, ctx *Context, path string) (Provider, error) { env := os.Environ() + plug, err := newPlugin(ctx, ctx.Pwd, path, "", - []string{host.ServerAddr()}, env, otgrpc.SpanDecorator(decorateProviderSpans)) + []string{host.ServerAddr()}, env, providerPluginDialOptions(ctx, "", path)) if err != nil { return nil, err } diff --git a/sdk/go/common/util/rpcutil/interceptor.go b/sdk/go/common/util/rpcutil/interceptor.go index 48b20e037229..9bbd8996bde0 100644 --- a/sdk/go/common/util/rpcutil/interceptor.go +++ b/sdk/go/common/util/rpcutil/interceptor.go @@ -23,6 +23,15 @@ import ( "google.golang.org/grpc" ) +// Configures interceptors to propagate OpenTracing metadata through headers. If parentSpan is non-nil, it becomes the +// default parent for orphan spans. +func OpenTracingServerInterceptorOptions(parentSpan opentracing.Span, options ...otgrpc.Option) []grpc.ServerOption { + return []grpc.ServerOption{ + grpc.ChainUnaryInterceptor(OpenTracingServerInterceptor(parentSpan, options...)), + grpc.ChainStreamInterceptor(OpenTracingStreamServerInterceptor(parentSpan, options...)), + } +} + // OpenTracingServerInterceptor provides a default gRPC server // interceptor for emitting tracing to the global OpenTracing tracer. func OpenTracingServerInterceptor(parentSpan opentracing.Span, options ...otgrpc.Option) grpc.UnaryServerInterceptor { @@ -71,6 +80,14 @@ func OpenTracingStreamClientInterceptor(options ...otgrpc.Option) grpc.StreamCli return otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer(), options...) } +// Configures gRPC clients with OpenTracing interceptors. +func OpenTracingInterceptorDialOptions(opts ...otgrpc.Option) []grpc.DialOption { + return []grpc.DialOption{ + grpc.WithChainUnaryInterceptor(OpenTracingClientInterceptor(opts...)), + grpc.WithChainStreamInterceptor(OpenTracingStreamClientInterceptor(opts...)), + } +} + // Wraps an opentracing.Tracer to reparent orphan traces with a given // default parent span. type reparentingTracer struct { diff --git a/sdk/go/common/util/rpcutil/serve.go b/sdk/go/common/util/rpcutil/serve.go index d8468b411e44..435f7461b734 100644 --- a/sdk/go/common/util/rpcutil/serve.go +++ b/sdk/go/common/util/rpcutil/serve.go @@ -1,4 +1,4 @@ -// Copyright 2016-2018, Pulumi Corporation. +// Copyright 2016-2022, Pulumi Corporation. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -44,33 +44,58 @@ func IsBenignCloseErr(err error) bool { strings.HasSuffix(msg, "grpc: the server has been stopped") } -// Serve creates a new gRPC server, calls out to the supplied registration functions to bind interfaces, and then -// listens on the supplied TCP port. If the caller wishes for the kernel to choose a free port automatically, pass 0 as -// the port number. The return values are: the chosen port (the same as supplied if non-0), a channel that may -// eventually return an error, and an error, in case something went wrong. The channel is non-nil and waits until -// the server is finished, in the case of a successful launch of the RPC server. -func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error, - parentSpan opentracing.Span, options ...otgrpc.Option) (int, chan error, error) { +type ServeOptions struct { + // Port to listen on. Passing 0 makes the system choose a port automatically. + Port int + + // Initializer for the server. A typical Init registers handlers. + Init func(*grpc.Server) error + + // If non-nil, Serve will gracefully terminate the server when Cancel is closed or receives true. + Cancel chan bool + + // Options for serving gRPC. + Options []grpc.ServerOption +} + +type ServeHandle struct { + // Port the server is listening on. + Port int + + // The channel is non-nil and is closed when the server stops serving. The server will pass a non-nil error on + // this channel if something went wrong in the background and it did not terminate gracefully. + Done <-chan error +} + +// ServeWithOptions creates a new gRPC server, calls opts.Init and listens on a TCP port. +func ServeWithOptions(opts ServeOptions) (ServeHandle, error) { + h, _, err := serveWithOptions(opts) + return h, err +} + +func serveWithOptions(opts ServeOptions) (ServeHandle, chan error, error) { + port := opts.Port // Listen on a TCP port, but let the kernel choose a free port for us. lis, err := net.Listen("tcp", "127.0.0.1:"+strconv.Itoa(port)) if err != nil { - return port, nil, errors.Errorf("failed to listen on TCP port ':%v': %v", port, err) + return ServeHandle{Port: port}, nil, + errors.Errorf("failed to listen on TCP port ':%v': %v", port, err) } health := health.NewServer() // Now new up a gRPC server and register any RPC interfaces the caller wants. - srv := grpc.NewServer( - grpc.UnaryInterceptor(OpenTracingServerInterceptor(parentSpan, options...)), - grpc.StreamInterceptor(OpenTracingStreamServerInterceptor(parentSpan, options...)), - grpc.MaxRecvMsgSize(maxRPCMessageSize), - ) - for _, register := range registers { - if err := register(srv); err != nil { - return port, nil, errors.Errorf("failed to register RPC handler: %v", err) + + srv := grpc.NewServer(append(opts.Options, grpc.MaxRecvMsgSize(maxRPCMessageSize))...) + + if opts.Init != nil { + if err := opts.Init(srv); err != nil { + return ServeHandle{Port: port}, nil, + errors.Errorf("failed to Init GRPC to register RPC handlers: %v", err) } } + healthgrpc.RegisterHealthServer(srv, health) // enable health checks reflection.Register(srv) // enable reflection. @@ -87,8 +112,7 @@ func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error, port = tcpa.Port } - // If the caller provided a cancellation channel, start a goroutine that will gracefully terminate the gRPC server when - // that channel is closed or receives a `true` value. + cancel := opts.Cancel if cancel != nil { go func() { for v, ok := <-cancel; !v && ok; v, ok = <-cancel { @@ -109,5 +133,31 @@ func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error, close(done) }() - return port, done, nil + return ServeHandle{Port: port, Done: done}, done, nil +} + +// Deprecated. Please use ServeWithOptions and OpenTracingServerInterceptorOptions. +func Serve(port int, cancel chan bool, registers []func(*grpc.Server) error, + parentSpan opentracing.Span, options ...otgrpc.Option) (int, chan error, error) { + + opts := ServeOptions{ + Port: port, + Cancel: cancel, + Init: func(s *grpc.Server) error { + for _, r := range registers { + if err := r(s); err != nil { + return err + } + } + return nil + }, + Options: OpenTracingServerInterceptorOptions(parentSpan, options...), + } + + handle, done, err := serveWithOptions(opts) + if err != nil { + return 0, nil, err + } + + return handle.Port, done, nil } diff --git a/sdk/go/pulumi-language-go/main.go b/sdk/go/pulumi-language-go/main.go index 7ea60660873a..d7ccb1b34b2a 100644 --- a/sdk/go/pulumi-language-go/main.go +++ b/sdk/go/pulumi-language-go/main.go @@ -127,22 +127,24 @@ func main() { } // Fire up a gRPC server, letting the kernel choose a free port. - port, done, err := rpcutil.Serve(0, cancelChannel, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: cancelChannel, + Init: func(srv *grpc.Server) error { host := newLanguageHost(engineAddress, tracing, binary, buildTarget) pulumirpc.RegisterLanguageRuntimeServer(srv, host) return nil }, - }, nil) + Options: rpcutil.OpenTracingServerInterceptorOptions(nil), + }) if err != nil { cmdutil.Exit(errors.Wrapf(err, "could not start language host RPC server")) } // Otherwise, print out the port so that the spawner knows how to reach us. - fmt.Printf("%d\n", port) + fmt.Printf("%d\n", handle.Port) // And finally wait for the server to stop serving. - if err := <-done; err != nil { + if err := <-handle.Done; err != nil { cmdutil.Exit(errors.Wrapf(err, "language host RPC stopped serving")) } } diff --git a/sdk/nodejs/cmd/pulumi-language-nodejs/main.go b/sdk/nodejs/cmd/pulumi-language-nodejs/main.go index be36a7cb3eef..9d49825240df 100644 --- a/sdk/nodejs/cmd/pulumi-language-nodejs/main.go +++ b/sdk/nodejs/cmd/pulumi-language-nodejs/main.go @@ -119,22 +119,24 @@ func main() { } // Fire up a gRPC server, letting the kernel choose a free port. - port, done, err := rpcutil.Serve(0, cancelChannel, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: cancelChannel, + Init: func(srv *grpc.Server) error { host := newLanguageHost(engineAddress, tracing, typescript, tsconfigpath, nodeargs) pulumirpc.RegisterLanguageRuntimeServer(srv, host) return nil }, - }, nil) + Options: rpcutil.OpenTracingServerInterceptorOptions(nil), + }) if err != nil { cmdutil.Exit(errors.Wrapf(err, "could not start language host RPC server")) } // Otherwise, print out the port so that the spawner knows how to reach us. - fmt.Printf("%d\n", port) + fmt.Printf("%d\n", handle.Port) // And finally wait for the server to stop serving. - if err := <-done; err != nil { + if err := <-handle.Done; err != nil { cmdutil.Exit(errors.Wrapf(err, "language host RPC stopped serving")) } } @@ -460,12 +462,14 @@ func (host *nodeLanguageHost) Run(ctx context.Context, req *pulumirpc.RunRequest }() // Launch the rpc server giving it the real monitor to forward messages to. - port, serverDone, err := rpcutil.Serve(0, serverCancel, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: serverCancel, + Init: func(srv *grpc.Server) error { pulumirpc.RegisterResourceMonitorServer(srv, &monitorProxy{target}) return nil }, - }, tracingSpan) + Options: rpcutil.OpenTracingServerInterceptorOptions(tracingSpan), + }) if err != nil { return nil, err } @@ -484,7 +488,7 @@ func (host *nodeLanguageHost) Run(ctx context.Context, req *pulumirpc.RunRequest // Forward any rpc server or pipe errors to our output channel. go func() { - err := <-serverDone + err := <-handle.Done if err != nil { responseChannel <- &pulumirpc.RunResponse{Error: err.Error()} } @@ -513,7 +517,8 @@ func (host *nodeLanguageHost) Run(ctx context.Context, req *pulumirpc.RunRequest } // now, launch the nodejs process and actually run the user code in it. - go host.execNodejs(ctx, responseChannel, req, nodeBin, runPath, fmt.Sprintf("127.0.0.1:%d", port), pipes.directory()) + go host.execNodejs(ctx, responseChannel, req, nodeBin, runPath, + fmt.Sprintf("127.0.0.1:%d", handle.Port), pipes.directory()) // Wait for one of our launched goroutines to signal that we're done. This might be our proxy // (in the case of errors), or the launched nodejs completing (either successfully, or with diff --git a/sdk/python/cmd/pulumi-language-python/main.go b/sdk/python/cmd/pulumi-language-python/main.go index d25b8ef1b254..0e8a8841ba07 100644 --- a/sdk/python/cmd/pulumi-language-python/main.go +++ b/sdk/python/cmd/pulumi-language-python/main.go @@ -149,22 +149,24 @@ func main() { virtualenvPath := resolveVirtualEnvironmentPath(root, virtualenv) // Fire up a gRPC server, letting the kernel choose a free port. - port, done, err := rpcutil.Serve(0, cancelChannel, []func(*grpc.Server) error{ - func(srv *grpc.Server) error { + handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{ + Cancel: cancelChannel, + Init: func(srv *grpc.Server) error { host := newLanguageHost(pythonExec, engineAddress, tracing, cwd, virtualenv, virtualenvPath) pulumirpc.RegisterLanguageRuntimeServer(srv, host) return nil }, - }, nil) + Options: rpcutil.OpenTracingServerInterceptorOptions(nil), + }) if err != nil { cmdutil.Exit(errors.Wrapf(err, "could not start language host RPC server")) } // Otherwise, print out the port so that the spawner knows how to reach us. - fmt.Printf("%d\n", port) + fmt.Printf("%d\n", handle.Port) // And finally wait for the server to stop serving. - if err := <-done; err != nil { + if err := <-handle.Done; err != nil { cmdutil.Exit(errors.Wrapf(err, "language host RPC stopped serving")) } }