Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debug GRPC interceptors that print conversations in JSON #11085

Merged
merged 1 commit into from Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/pending/20221102--cli--pulumi-debug-grpc.yaml
@@ -0,0 +1,4 @@
changes:
- type: feat
scope: cli
description: "Enables debug tracing of Pulumi gRPC internals: `PULUMI_DEBUG_GRPC=$PWD/grpc.json pulumi up`"
18 changes: 18 additions & 0 deletions pkg/engine/deployment.go
Expand Up @@ -18,12 +18,15 @@ import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"

"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/display"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
Expand Down Expand Up @@ -57,6 +60,21 @@ func ProjectInfoContext(projinfo *Projinfo, host plugin.Host,
return "", "", nil, err
}

if logFile := os.Getenv("PULUMI_DEBUG_GRPC"); logFile != "" {
di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{
LogFile: logFile,
Mutex: ctx.DebugTraceMutex,
})
if err != nil {
return "", "", nil, err
}
ctx.DialOptions = func(metadata interface{}) []grpc.DialOption {
return di.DialOptions(interceptors.LogOptions{
Metadata: metadata,
})
}
}

// If the project wants to connect to an existing language runtime, do so now.
if projinfo.Proj.Runtime.Name() == clientRuntimeName {
addressValue, ok := projinfo.Proj.Runtime.Options()["address"]
Expand Down
28 changes: 23 additions & 5 deletions pkg/engine/plugin_host.go
@@ -1,4 +1,4 @@
// Copyright 2016-2020, Pulumi Corporation.
// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,10 +32,7 @@ type clientLanguageRuntimeHost struct {

func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host, error) {
// Dial the language runtime.
conn, err := grpc.Dial(address, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()),
rpcutil.GrpcChannelOptions())
conn, err := grpc.Dial(address, langRuntimePluginDialOptions(ctx, address)...)
if err != nil {
return nil, fmt.Errorf("could not connect to language host: %w", err)
}
Expand All @@ -50,3 +47,24 @@ func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host,
func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

func langRuntimePluginDialOptions(ctx *plugin.Context, address string) []grpc.DialOption {
dialOpts := append(
rpcutil.OpenTracingInterceptorDialOptions(),
grpc.WithInsecure(),
rpcutil.GrpcChannelOptions(),
)

if ctx.DialOptions != nil {
metadata := map[string]interface{}{
"mode": "client",
"kind": "language",
}
if address != "" {
metadata["address"] = address
}
dialOpts = append(dialOpts, ctx.DialOptions(metadata)...)
}

return dialOpts
}
20 changes: 12 additions & 8 deletions pkg/resource/deploy/deploytest/pluginhost.go
Expand Up @@ -156,17 +156,19 @@ func (w *grpcWrapper) Close() error {

func wrapProviderWithGrpc(provider plugin.Provider) (plugin.Provider, io.Closer, error) {
wrapper := &grpcWrapper{stop: make(chan bool)}
port, _, err := rpcutil.Serve(0, wrapper.stop, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: wrapper.stop,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterResourceProviderServer(srv, plugin.NewProviderServer(provider))
return nil
},
}, nil)
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
return nil, nil, fmt.Errorf("could not start resource provider service: %w", err)
}
conn, err := grpc.Dial(
fmt.Sprintf("127.0.0.1:%v", port),
fmt.Sprintf("127.0.0.1:%v", handle.Port),
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()),
Expand Down Expand Up @@ -242,16 +244,18 @@ func NewPluginHost(sink, statusSink diag.Sink, languageRuntime plugin.LanguageRu
statusSink: statusSink,
stop: make(chan bool),
}
port, _, err := rpcutil.Serve(0, engine.stop, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: engine.stop,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterEngineServer(srv, engine)
return nil
},
}, nil)
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
panic(fmt.Errorf("could not start engine service: %v", err))
}
engine.address = fmt.Sprintf("127.0.0.1:%v", port)
engine.address = fmt.Sprintf("127.0.0.1:%v", handle.Port)

return &pluginHost{
pluginLoaders: pluginLoaders,
Expand Down
40 changes: 34 additions & 6 deletions pkg/resource/deploy/source_eval.go
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"
Expand All @@ -32,6 +33,7 @@ import (
"google.golang.org/grpc/codes"

"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
Expand Down Expand Up @@ -488,7 +490,7 @@ type resmon struct {
regOutChan chan *registerResourceOutputsEvent // the channel to send resource output registrations to.
regReadChan chan *readResourceEvent // the channel to send resource reads to.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
done <-chan error // a channel that resolves when the server completes.
disableResourceReferences bool // true if resource references are disabled.
disableOutputValues bool // true if output values are disabled.
}
Expand Down Expand Up @@ -528,12 +530,14 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg
}

// Fire up a gRPC server and start listening for incomings.
port, done, err := rpcutil.Serve(0, resmon.cancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: resmon.cancel,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterResourceMonitorServer(srv, resmon)
return nil
},
}, tracingSpan, otgrpc.SpanDecorator(decorateResourceSpans))
Options: sourceEvalServeOptions(src.plugctx, tracingSpan),
})
if err != nil {
return nil, err
}
Expand All @@ -545,9 +549,9 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg
ConfigSecretKeys: configSecretKeys,
DryRun: src.dryRun,
Parallel: opts.Parallel,
MonitorAddress: fmt.Sprintf("127.0.0.1:%d", port),
MonitorAddress: fmt.Sprintf("127.0.0.1:%d", handle.Port),
}
resmon.done = done
resmon.done = handle.Done

go d.serve()

Expand All @@ -565,6 +569,30 @@ func (rm *resmon) Cancel() error {
return <-rm.done
}

func sourceEvalServeOptions(ctx *plugin.Context, tracingSpan opentracing.Span) []grpc.ServerOption {
serveOpts := rpcutil.OpenTracingServerInterceptorOptions(
tracingSpan,
otgrpc.SpanDecorator(decorateResourceSpans),
)
if logFile := os.Getenv("PULUMI_DEBUG_GRPC"); logFile != "" {
di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{
LogFile: logFile,
Mutex: ctx.DebugTraceMutex,
})
if err != nil {
// ignoring
return nil
}
metadata := map[string]interface{}{
"mode": "server",
}
serveOpts = append(serveOpts, di.ServerOptions(interceptors.LogOptions{
Metadata: metadata,
})...)
}
return serveOpts
}

// getProviderReference fetches the provider reference for a resource, read, or invoke from the given package with the
// given unparsed provider reference. If the unparsed provider reference is empty, this function returns a reference
// to the default provider for the indicated package.
Expand Down
14 changes: 8 additions & 6 deletions pkg/resource/deploy/source_query.go
Expand Up @@ -249,17 +249,19 @@ func newQueryResourceMonitor(
}

// Fire up a gRPC server and start listening for incomings.
port, done, err := rpcutil.Serve(0, queryResmon.cancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: queryResmon.cancel,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterResourceMonitorServer(srv, queryResmon)
return nil
},
}, tracingSpan)
Options: rpcutil.OpenTracingServerInterceptorOptions(tracingSpan),
})
if err != nil {
return nil, err
}

monitorAddress := fmt.Sprintf("127.0.0.1:%d", port)
monitorAddress := fmt.Sprintf("127.0.0.1:%d", handle.Port)

var config map[config.Key]string
if runinfo.Target != nil {
Expand All @@ -283,7 +285,7 @@ func newQueryResourceMonitor(
MonitorAddress: monitorAddress,
}
queryResmon.addr = monitorAddress
queryResmon.done = done
queryResmon.done = handle.Done

go d.serve()

Expand All @@ -303,7 +305,7 @@ type queryResmon struct {
defaultProviders *defaultProviders // the default provider manager.
addr string // the address the host is listening on.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
done <-chan error // a channel that resolves when the server completes.
reg *providers.Registry // registry for resource providers.
callInfo plugin.CallInfo // information for call calls.
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/resource/provider/main.go
Expand Up @@ -73,25 +73,27 @@ func Main(name string, provMaker func(*HostClient) (pulumirpc.ResourceProviderSe
}

// Fire up a gRPC server, letting the kernel choose a free port for us.
port, done, err := rpcutil.Serve(0, cancelChannel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: cancelChannel,
Init: func(srv *grpc.Server) error {
prov, proverr := provMaker(host)
if proverr != nil {
return fmt.Errorf("failed to create resource provider: %v", proverr)
}
pulumirpc.RegisterResourceProviderServer(srv, prov)
return nil
},
}, nil)
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
return fmt.Errorf("fatal: %v", err)
}

// The resource provider protocol requires that we now write out the port we have chosen to listen on.
fmt.Printf("%d\n", port)
fmt.Printf("%d\n", handle.Port)

// Finally, wait for the server to stop serving.
if err := <-done; err != nil {
if err := <-handle.Done; err != nil {
return fmt.Errorf("fatal: %v", err)
}

Expand Down