Skip to content

Commit

Permalink
Introduce PULUMI_DEBUG_GRPC support
Browse files Browse the repository at this point in the history
  • Loading branch information
t0yv0 committed Nov 8, 2022
1 parent 4910d9c commit 9276574
Show file tree
Hide file tree
Showing 23 changed files with 794 additions and 128 deletions.
4 changes: 4 additions & 0 deletions changelog/pending/20221102--cli--pulumi-debug-grpc.yaml
@@ -0,0 +1,4 @@
changes:
- type: feat
scope: cli
description: "Enables debug tracing of Pulumi gRPC internals: `PULUMI_DEBUG_GRPC=$PWD/grpc.json pulumi up`"
18 changes: 18 additions & 0 deletions pkg/engine/deployment.go
Expand Up @@ -18,12 +18,15 @@ import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"

"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/display"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
Expand Down Expand Up @@ -57,6 +60,21 @@ func ProjectInfoContext(projinfo *Projinfo, host plugin.Host,
return "", "", nil, err
}

if logFile := os.Getenv("PULUMI_DEBUG_GRPC"); logFile != "" {
di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{
LogFile: logFile,
Mutex: ctx.DebugTraceMutex,
})
if err != nil {
return "", "", nil, err
}
ctx.DialOptions = func(metadata interface{}) []grpc.DialOption {
return di.DialOptions(interceptors.LogOptions{
Metadata: metadata,
})
}
}

// If the project wants to connect to an existing language runtime, do so now.
if projinfo.Proj.Runtime.Name() == clientRuntimeName {
addressValue, ok := projinfo.Proj.Runtime.Options()["address"]
Expand Down
28 changes: 23 additions & 5 deletions pkg/engine/plugin_host.go
@@ -1,4 +1,4 @@
// Copyright 2016-2020, Pulumi Corporation.
// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,10 +32,7 @@ type clientLanguageRuntimeHost struct {

func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host, error) {
// Dial the language runtime.
conn, err := grpc.Dial(address, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()),
rpcutil.GrpcChannelOptions())
conn, err := grpc.Dial(address, langRuntimePluginDialOptions(ctx, address)...)
if err != nil {
return nil, fmt.Errorf("could not connect to language host: %w", err)
}
Expand All @@ -50,3 +47,24 @@ func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host,
func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

func langRuntimePluginDialOptions(ctx *plugin.Context, address string) []grpc.DialOption {
dialOpts := append(
rpcutil.OpenTracingInterceptorDialOptions(),
grpc.WithInsecure(),
rpcutil.GrpcChannelOptions(),
)

if ctx.DialOptions != nil {
metadata := map[string]interface{}{
"mode": "client",
"kind": "language",
}
if address != "" {
metadata["address"] = address
}
dialOpts = append(dialOpts, ctx.DialOptions(metadata)...)
}

return dialOpts
}
20 changes: 12 additions & 8 deletions pkg/resource/deploy/deploytest/pluginhost.go
Expand Up @@ -156,17 +156,19 @@ func (w *grpcWrapper) Close() error {

func wrapProviderWithGrpc(provider plugin.Provider) (plugin.Provider, io.Closer, error) {
wrapper := &grpcWrapper{stop: make(chan bool)}
port, _, err := rpcutil.Serve(0, wrapper.stop, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: wrapper.stop,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterResourceProviderServer(srv, plugin.NewProviderServer(provider))
return nil
},
}, nil)
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
return nil, nil, fmt.Errorf("could not start resource provider service: %w", err)
}
conn, err := grpc.Dial(
fmt.Sprintf("127.0.0.1:%v", port),
fmt.Sprintf("127.0.0.1:%v", handle.Port),
grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()),
Expand Down Expand Up @@ -242,16 +244,18 @@ func NewPluginHost(sink, statusSink diag.Sink, languageRuntime plugin.LanguageRu
statusSink: statusSink,
stop: make(chan bool),
}
port, _, err := rpcutil.Serve(0, engine.stop, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: engine.stop,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterEngineServer(srv, engine)
return nil
},
}, nil)
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
panic(fmt.Errorf("could not start engine service: %v", err))
}
engine.address = fmt.Sprintf("127.0.0.1:%v", port)
engine.address = fmt.Sprintf("127.0.0.1:%v", handle.Port)

return &pluginHost{
pluginLoaders: pluginLoaders,
Expand Down
40 changes: 34 additions & 6 deletions pkg/resource/deploy/source_eval.go
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"
Expand All @@ -32,6 +33,7 @@ import (
"google.golang.org/grpc/codes"

"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
Expand Down Expand Up @@ -488,7 +490,7 @@ type resmon struct {
regOutChan chan *registerResourceOutputsEvent // the channel to send resource output registrations to.
regReadChan chan *readResourceEvent // the channel to send resource reads to.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
done <-chan error // a channel that resolves when the server completes.
disableResourceReferences bool // true if resource references are disabled.
disableOutputValues bool // true if output values are disabled.
}
Expand Down Expand Up @@ -528,12 +530,14 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg
}

// Fire up a gRPC server and start listening for incomings.
port, done, err := rpcutil.Serve(0, resmon.cancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: resmon.cancel,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterResourceMonitorServer(srv, resmon)
return nil
},
}, tracingSpan, otgrpc.SpanDecorator(decorateResourceSpans))
Options: sourceEvalServeOptions(src.plugctx, tracingSpan),
})
if err != nil {
return nil, err
}
Expand All @@ -545,9 +549,9 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg
ConfigSecretKeys: configSecretKeys,
DryRun: src.dryRun,
Parallel: opts.Parallel,
MonitorAddress: fmt.Sprintf("127.0.0.1:%d", port),
MonitorAddress: fmt.Sprintf("127.0.0.1:%d", handle.Port),
}
resmon.done = done
resmon.done = handle.Done

go d.serve()

Expand All @@ -565,6 +569,30 @@ func (rm *resmon) Cancel() error {
return <-rm.done
}

func sourceEvalServeOptions(ctx *plugin.Context, tracingSpan opentracing.Span) []grpc.ServerOption {
serveOpts := rpcutil.OpenTracingServerInterceptorOptions(
tracingSpan,
otgrpc.SpanDecorator(decorateResourceSpans),
)
if logFile := os.Getenv("PULUMI_DEBUG_GRPC"); logFile != "" {
di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{
LogFile: logFile,
Mutex: ctx.DebugTraceMutex,
})
if err != nil {
// ignoring
return nil
}
metadata := map[string]interface{}{
"mode": "server",
}
serveOpts = append(serveOpts, di.ServerOptions(interceptors.LogOptions{
Metadata: metadata,
})...)
}
return serveOpts
}

// getProviderReference fetches the provider reference for a resource, read, or invoke from the given package with the
// given unparsed provider reference. If the unparsed provider reference is empty, this function returns a reference
// to the default provider for the indicated package.
Expand Down
14 changes: 8 additions & 6 deletions pkg/resource/deploy/source_query.go
Expand Up @@ -249,17 +249,19 @@ func newQueryResourceMonitor(
}

// Fire up a gRPC server and start listening for incomings.
port, done, err := rpcutil.Serve(0, queryResmon.cancel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: queryResmon.cancel,
Init: func(srv *grpc.Server) error {
pulumirpc.RegisterResourceMonitorServer(srv, queryResmon)
return nil
},
}, tracingSpan)
Options: rpcutil.OpenTracingServerInterceptorOptions(tracingSpan),
})
if err != nil {
return nil, err
}

monitorAddress := fmt.Sprintf("127.0.0.1:%d", port)
monitorAddress := fmt.Sprintf("127.0.0.1:%d", handle.Port)

var config map[config.Key]string
if runinfo.Target != nil {
Expand All @@ -283,7 +285,7 @@ func newQueryResourceMonitor(
MonitorAddress: monitorAddress,
}
queryResmon.addr = monitorAddress
queryResmon.done = done
queryResmon.done = handle.Done

go d.serve()

Expand All @@ -303,7 +305,7 @@ type queryResmon struct {
defaultProviders *defaultProviders // the default provider manager.
addr string // the address the host is listening on.
cancel chan bool // a channel that can cancel the server.
done chan error // a channel that resolves when the server completes.
done <-chan error // a channel that resolves when the server completes.
reg *providers.Registry // registry for resource providers.
callInfo plugin.CallInfo // information for call calls.
}
Expand Down
12 changes: 7 additions & 5 deletions pkg/resource/provider/main.go
Expand Up @@ -73,25 +73,27 @@ func Main(name string, provMaker func(*HostClient) (pulumirpc.ResourceProviderSe
}

// Fire up a gRPC server, letting the kernel choose a free port for us.
port, done, err := rpcutil.Serve(0, cancelChannel, []func(*grpc.Server) error{
func(srv *grpc.Server) error {
handle, err := rpcutil.ServeWithOptions(rpcutil.ServeOptions{
Cancel: cancelChannel,
Init: func(srv *grpc.Server) error {
prov, proverr := provMaker(host)
if proverr != nil {
return fmt.Errorf("failed to create resource provider: %v", proverr)
}
pulumirpc.RegisterResourceProviderServer(srv, prov)
return nil
},
}, nil)
Options: rpcutil.OpenTracingServerInterceptorOptions(nil),
})
if err != nil {
return fmt.Errorf("fatal: %v", err)
}

// The resource provider protocol requires that we now write out the port we have chosen to listen on.
fmt.Printf("%d\n", port)
fmt.Printf("%d\n", handle.Port)

// Finally, wait for the server to stop serving.
if err := <-done; err != nil {
if err := <-handle.Done; err != nil {
return fmt.Errorf("fatal: %v", err)
}

Expand Down

0 comments on commit 9276574

Please sign in to comment.