Skip to content

Commit

Permalink
Merge #11085
Browse files Browse the repository at this point in the history
11085: Add debug GRPC interceptors that print conversations in JSON r=t0yv0 a=t0yv0

<!--- 
Thanks so much for your contribution! If this is your first time contributing, please ensure that you have read the [CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md) documentation.
-->

# Description

<!--- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. -->

Fixes #10232

`PULUMI_DEBUG_GRPC=./debug.json pulumi preview` now works.

The debug APIs are probably unstable so I moved them out to their own repo rather than hosting under sdk/.

Debug interceptors are no-op unless the process initializes them in main. Currently only `pulumi` process initializes them if the env var is set. This way we observe conversations only once, as a server for ResourceMonitor etc, and as a client for plugin conversations (language and resource).

I can now see provider conversations logged which was of interest to me.


## Checklist

<!--- Please provide details if the checkbox below is to be left unchecked. -->
- [ ] I have added tests that prove my fix is effective or that my feature works
<!--- 
User-facing changes require a CHANGELOG entry.
-->
- [ ] I have run `make changelog` and committed the `changelog/pending/<file>` documenting my change
<!--
If the change(s) in this PR is a modification of an existing call to the Pulumi Service,
then the service should honor older versions of the CLI where this change would not exist.
You must then bump the API version in /pkg/backend/httpstate/client/api.go, as well as add
it to the service.
-->
- [ ] Yes, there are changes in this PR that warrants bumping the Pulumi Service API version
  <!-- `@Pulumi` employees: If yes, you must submit corresponding changes in the service repo. -->


Co-authored-by: Anton Tayanovskyy <anton.tayanovskyy@gmail.com>
Co-authored-by: Anton Tayanovskyy <anton@pulumi.com>
  • Loading branch information
3 people committed Nov 8, 2022
2 parents 83c9dfc + f31c5fd commit aee8e5c
Show file tree
Hide file tree
Showing 23 changed files with 794 additions and 128 deletions.
4 changes: 4 additions & 0 deletions changelog/pending/20221102--cli--pulumi-debug-grpc.yaml
@@ -0,0 +1,4 @@
changes:
- type: feat
scope: cli
description: "Enables debug tracing of Pulumi gRPC internals: `PULUMI_DEBUG_GRPC=$PWD/grpc.json pulumi up`"
18 changes: 18 additions & 0 deletions pkg/engine/deployment.go
Expand Up @@ -18,12 +18,15 @@ import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"

"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/display"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
Expand Down Expand Up @@ -57,6 +60,21 @@ func ProjectInfoContext(projinfo *Projinfo, host plugin.Host,
return "", "", nil, err
}

if logFile := os.Getenv("PULUMI_DEBUG_GRPC"); logFile != "" {
di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{
LogFile: logFile,
Mutex: ctx.DebugTraceMutex,
})
if err != nil {
return "", "", nil, err
}
ctx.DialOptions = func(metadata interface{}) []grpc.DialOption {
return di.DialOptions(interceptors.LogOptions{
Metadata: metadata,
})
}
}

// If the project wants to connect to an existing language runtime, do so now.
if projinfo.Proj.Runtime.Name() == clientRuntimeName {
addressValue, ok := projinfo.Proj.Runtime.Options()["address"]
Expand Down
28 changes: 23 additions & 5 deletions pkg/engine/plugin_host.go
@@ -1,4 +1,4 @@
// Copyright 2016-2020, Pulumi Corporation.
// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,10 +32,7 @@ type clientLanguageRuntimeHost struct {

func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host, error) {
// Dial the language runtime.
conn, err := grpc.Dial(address, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()),
rpcutil.GrpcChannelOptions())
conn, err := grpc.Dial(address, langRuntimePluginDialOptions(ctx, address)...)
if err != nil {
return nil, fmt.Errorf("could not connect to language host: %w", err)
}
Expand All @@ -50,3 +47,24 @@ func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host,
func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

func langRuntimePluginDialOptions(ctx *plugin.Context, address string) []grpc.DialOption {
dialOpts := append(
rpcutil.OpenTracingInterceptorDialOptions(),
grpc.WithInsecure(),
rpcutil.GrpcChannelOptions(),
)

if ctx.DialOptions != nil {
metadata := map[string]interface{}{
"mode": "client",
"kind": "language",
}
if address != "" {
metadata["address"] = address
}
dialOpts = append(dialOpts, ctx.DialOptions(metadata)...)
}

return dialOpts
}
20 changes: 12 additions & 8 deletions pkg/resource/deploy/deploytest/pluginhost.go
Expand Up @@ -156,17 +156,19 @@ func (w *grpcWrapper) Close() error {

func wrapProviderWithGrpc(provider plugin.Provider) (plugin.Provider, io.Closer, error) {
wrapper := &grpcWrapper{stop: make(chan bool)}
port, _, err := rpcutil.Serve(0, wrapper.stop, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: wrapper.stop,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterResourceProviderServer(srv, plugin.NewProviderServer(provider))
return nil
},
}, nil)
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
return nil, nil, fmt.Errorf("could not start resource provider service: %w", err)
}
conn, err := grpc.Dial(
fmt.Sprintf("127.0.0.1:%v", port),
fmt.Sprintf("127.0.0.1:%v", handle.Port),
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()),
Expand Down Expand Up @@ -242,16 +244,18 @@ func NewPluginHost(sink, statusSink diag.Sink, languageRuntime plugin.LanguageRu
statusSink: statusSink,
stop: make(chan bool),
}
port, _, err := rpcutil.Serve(0, engine.stop, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: engine.stop,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterEngineServer(srv, engine)
return nil
},
}, nil)
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
panic(fmt.Errorf("could not start engine service: %v", err))
}
engine.address = fmt.Sprintf("127.0.0.1:%v", port)
engine.address = fmt.Sprintf("127.0.0.1:%v", handle.Port)

return &pluginHost{
pluginLoaders: pluginLoaders,
Expand Down
40 changes: 34 additions & 6 deletions pkg/resource/deploy/source_eval.go
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"
Expand All @@ -32,6 +33,7 @@ import (
"google.golang.org/grpc/codes"

"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
Expand Down Expand Up @@ -488,7 +490,7 @@ type resmon struct {
regOutChan chan *registerResourceOutputsEvent // the channel to send resource output registrations to.
regReadChan chan *readResourceEvent // the channel to send resource reads to.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
done <-chan error // a channel that resolves when the server completes.
disableResourceReferences bool // true if resource references are disabled.
disableOutputValues bool // true if output values are disabled.
}
Expand Down Expand Up @@ -528,12 +530,14 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg
}

// Fire up a gRPC server and start listening for incomings.
port, done, err := rpcutil.Serve(0, resmon.cancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: resmon.cancel,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterResourceMonitorServer(srv, resmon)
return nil
},
}, tracingSpan, otgrpc.SpanDecorator(decorateResourceSpans))
Options: sourceEvalServeOptions(src.plugctx, tracingSpan),
})
if err != nil {
return nil, err
}
Expand All @@ -545,9 +549,9 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg
ConfigSecretKeys: configSecretKeys,
DryRun: src.dryRun,
Parallel: opts.Parallel,
MonitorAddress: fmt.Sprintf("127.0.0.1:%d", port),
MonitorAddress: fmt.Sprintf("127.0.0.1:%d", handle.Port),
}
resmon.done = done
resmon.done = handle.Done

go d.serve()

Expand All @@ -565,6 +569,30 @@ func (rm *resmon) Cancel() error {
return <-rm.done
}

func sourceEvalServeOptions(ctx *plugin.Context, tracingSpan opentracing.Span) []grpc.ServerOption {
serveOpts := rpcutil.OpenTracingServerInterceptorOptions(
tracingSpan,
otgrpc.SpanDecorator(decorateResourceSpans),
)
if logFile := os.Getenv("PULUMI_DEBUG_GRPC"); logFile != "" {
di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{
LogFile: logFile,
Mutex: ctx.DebugTraceMutex,
})
if err != nil {
// ignoring
return nil
}
metadata := map[string]interface{}{
"mode": "server",
}
serveOpts = append(serveOpts, di.ServerOptions(interceptors.LogOptions{
Metadata: metadata,
})...)
}
return serveOpts
}

// getProviderReference fetches the provider reference for a resource, read, or invoke from the given package with the
// given unparsed provider reference. If the unparsed provider reference is empty, this function returns a reference
// to the default provider for the indicated package.
Expand Down
14 changes: 8 additions & 6 deletions pkg/resource/deploy/source_query.go
Expand Up @@ -249,17 +249,19 @@ func newQueryResourceMonitor(
}

// Fire up a gRPC server and start listening for incomings.
port, done, err := rpcutil.Serve(0, queryResmon.cancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: queryResmon.cancel,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterResourceMonitorServer(srv, queryResmon)
return nil
},
}, tracingSpan)
Options: rpcutil.OpenTracingServerInterceptorOptions(tracingSpan),
})
if err != nil {
return nil, err
}

monitorAddress := fmt.Sprintf("127.0.0.1:%d", port)
monitorAddress := fmt.Sprintf("127.0.0.1:%d", handle.Port)

var config map[config.Key]string
if runinfo.Target != nil {
Expand All @@ -283,7 +285,7 @@ func newQueryResourceMonitor(
MonitorAddress: monitorAddress,
}
queryResmon.addr = monitorAddress
queryResmon.done = done
queryResmon.done = handle.Done

go d.serve()

Expand All @@ -303,7 +305,7 @@ type queryResmon struct {
defaultProviders *defaultProviders // the default provider manager.
addr string // the address the host is listening on.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
done <-chan error // a channel that resolves when the server completes.
reg *providers.Registry // registry for resource providers.
callInfo plugin.CallInfo // information for call calls.
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/resource/provider/main.go
Expand Up @@ -73,25 +73,27 @@ func Main(name string, provMaker func(*HostClient) (pulumirpc.ResourceProviderSe
}

// Fire up a gRPC server, letting the kernel choose a free port for us.
port, done, err := rpcutil.Serve(0, cancelChannel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: cancelChannel,
Init: func(srv *grpc.Server) error {
prov, proverr := provMaker(host)
if proverr != nil {
return fmt.Errorf("failed to create resource provider: %v", proverr)
}
pulumirpc.RegisterResourceProviderServer(srv, prov)
return nil
},
}, nil)
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
return fmt.Errorf("fatal: %v", err)
}

// The resource provider protocol requires that we now write out the port we have chosen to listen on.
fmt.Printf("%d\n", port)
fmt.Printf("%d\n", handle.Port)

// Finally, wait for the server to stop serving.
if err := <-done; err != nil {
if err := <-handle.Done; err != nil {
return fmt.Errorf("fatal: %v", err)
}

Expand Down

0 comments on commit aee8e5c

Please sign in to comment.