Skip to content

Commit

Permalink
Integrate gRPC debug interceptors via plugin.Context
Browse files Browse the repository at this point in the history
  • Loading branch information
t0yv0 committed Nov 2, 2022
1 parent 7a22a2d commit 77c7894
Show file tree
Hide file tree
Showing 12 changed files with 738 additions and 33 deletions.
18 changes: 18 additions & 0 deletions pkg/engine/deployment.go
Expand Up @@ -18,12 +18,15 @@ import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/opentracing/opentracing-go"
"google.golang.org/grpc"

"github.com/pulumi/pulumi/pkg/v3/resource/deploy"
"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/display"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
Expand Down Expand Up @@ -57,6 +60,21 @@ func ProjectInfoContext(projinfo *Projinfo, host plugin.Host,
return "", "", nil, err
}

if logFile := os.Getenv("PULUMI_DEBUG_GRPC"); logFile != "" {
di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{
LogFile: logFile,
Mutex: &ctx.DebugTraceMutex,
})
if err != nil {
return "", "", nil, err
}
ctx.DialOptions = func(metadata interface{}) []grpc.DialOption {
return di.DialOptions(interceptors.LogOptions{
Metadata: metadata,
})
}
}

// If the project wants to connect to an existing language runtime, do so now.
if projinfo.Proj.Runtime.Name() == clientRuntimeName {
addressValue, ok := projinfo.Proj.Runtime.Options()["address"]
Expand Down
28 changes: 23 additions & 5 deletions pkg/engine/plugin_host.go
@@ -1,4 +1,4 @@
// Copyright 2016-2020, Pulumi Corporation.
// Copyright 2016-2022, Pulumi Corporation.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,10 +32,7 @@ type clientLanguageRuntimeHost struct {

func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host, error) {
// Dial the language runtime.
conn, err := grpc.Dial(address, grpc.WithInsecure(),
grpc.WithUnaryInterceptor(rpcutil.OpenTracingClientInterceptor()),
grpc.WithStreamInterceptor(rpcutil.OpenTracingStreamClientInterceptor()),
rpcutil.GrpcChannelOptions())
conn, err := grpc.Dial(address, langRuntimePluginDialOptions(ctx, address)...)
if err != nil {
return nil, fmt.Errorf("could not connect to language host: %w", err)
}
Expand All @@ -50,3 +47,24 @@ func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host,
func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

func langRuntimePluginDialOptions(ctx *plugin.Context, address string) []grpc.DialOption {
dialOpts := append(
rpcutil.OpenTracingInterceptorDialOptions(),
grpc.WithInsecure(),
rpcutil.GrpcChannelOptions(),
)

if ctx.DialOptions != nil {
metadata := map[string]interface{}{
"mode": "client",
"kind": "language",
}
if address != "" {
metadata["address"] = address
}
dialOpts = append(dialOpts, ctx.DialOptions(metadata)...)
}

return dialOpts
}
164 changes: 164 additions & 0 deletions pkg/go.sum

Large diffs are not rendered by default.

29 changes: 27 additions & 2 deletions pkg/resource/deploy/source_eval.go
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"sync"
"time"
Expand All @@ -32,6 +33,7 @@ import (
"google.golang.org/grpc/codes"

"github.com/pulumi/pulumi/pkg/v3/resource/deploy/providers"
interceptors "github.com/pulumi/pulumi/pkg/v3/util/rpcdebug"
"github.com/pulumi/pulumi/sdk/v3/go/common/diag"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource"
"github.com/pulumi/pulumi/sdk/v3/go/common/resource/config"
Expand Down Expand Up @@ -534,8 +536,7 @@ func newResourceMonitor(src *evalSource, provs ProviderSource, regChan chan *reg
pulumirpc.RegisterResourceMonitorServer(srv, resmon)
return nil
},
Options: rpcutil.OpenTracingServerInterceptorOptions(tracingSpan,
otgrpc.SpanDecorator(decorateResourceSpans)),
Options: sourceEvalServeOptions(src.plugctx, tracingSpan),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -568,6 +569,30 @@ func (rm *resmon) Cancel() error {
return <-rm.done
}

func sourceEvalServeOptions(ctx *plugin.Context, tracingSpan opentracing.Span) []grpc.ServerOption {
serveOpts := rpcutil.OpenTracingServerInterceptorOptions(
tracingSpan,
otgrpc.SpanDecorator(decorateResourceSpans),
)
if logFile := os.Getenv("PULUMI_DEBUG_GRPC"); logFile != "" {
di, err := interceptors.NewDebugInterceptor(interceptors.DebugInterceptorOptions{
LogFile: logFile,
Mutex: &ctx.DebugTraceMutex,
})
if err != nil {
// ignoring
return nil
}
metadata := map[string]interface{}{
"mode": "server",
}
serveOpts = append(serveOpts, di.ServerOptions(interceptors.LogOptions{
Metadata: metadata,
})...)
}
return serveOpts
}

// getProviderReference fetches the provider reference for a resource, read, or invoke from the given package with the
// given unparsed provider reference. If the unparsed provider reference is empty, this function returns a reference
// to the default provider for the indicated package.
Expand Down

0 comments on commit 77c7894

Please sign in to comment.