Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Engine and Golang support for shimless providers #10916

Merged
merged 1 commit into from Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/pending/20221004--engine--shimless.yaml
@@ -0,0 +1,4 @@
changes:
- type: feat
scope: engine
description: Engine and Golang support for language plugins starting providers directly.
4 changes: 2 additions & 2 deletions pkg/cmd/pulumi/about.go
Expand Up @@ -140,7 +140,7 @@ func getSummaryAbout(ctx context.Context, transitiveDependencies bool, selectedS
result.Plugins = plugins
}

lang, err := pluginContext.Host.LanguageRuntime(proj.Runtime.Name())
lang, err := pluginContext.Host.LanguageRuntime(projinfo.Root, pwd, proj.Runtime.Name(), proj.Runtime.Options())
if err != nil {
addError(err, fmt.Sprintf("Failed to load language plugin %s", proj.Runtime.Name()))
} else {
Expand Down Expand Up @@ -557,7 +557,7 @@ func getProjectPluginsSilently(
defer func() { os.Stdout = stdout }()
os.Stdout = w

return plugin.GetRequiredPlugins(ctx.Host, plugin.ProgInfo{
return plugin.GetRequiredPlugins(ctx.Host, ctx.Root, plugin.ProgInfo{
Proj: proj,
Pwd: pwd,
Program: main,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/pulumi/new.go
Expand Up @@ -695,7 +695,7 @@ func saveConfig(stack backend.Stack, c config.Map) error {
func installDependencies(ctx *plugin.Context, runtime *workspace.ProjectRuntimeInfo, directory string) error {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := ctx.Host.LanguageRuntime(runtime.Name())
lang, err := ctx.Host.LanguageRuntime(ctx.Root, ctx.Pwd, runtime.Name(), runtime.Options())
if err != nil {
return fmt.Errorf("failed to load language plugin %s: %w", runtime.Name(), err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/pulumi/plugin.go
Expand Up @@ -67,7 +67,7 @@ func getProjectPlugins() ([]workspace.PluginSpec, error) {

// Get the required plugins and then ensure they have metadata populated about them. Because it's possible
// a plugin required by the project hasn't yet been installed, we will simply skip any errors we encounter.
plugins, err := plugin.GetRequiredPlugins(ctx.Host, plugin.ProgInfo{
plugins, err := plugin.GetRequiredPlugins(ctx.Host, ctx.Root, plugin.ProgInfo{
Proj: proj,
Pwd: pwd,
Program: main,
Expand Down
10 changes: 5 additions & 5 deletions pkg/cmd/pulumi/policy_new.go
Expand Up @@ -221,7 +221,7 @@ virtualenv: venv
projinfo := &engine.Projinfo{Proj: &workspace.Project{
Main: proj.Main,
Runtime: proj.Runtime}, Root: root}
_, _, pluginCtx, err := engine.ProjectInfoContext(
pwd, _, pluginCtx, err := engine.ProjectInfoContext(
projinfo,
nil,
cmdutil.Diag(),
Expand All @@ -235,7 +235,7 @@ virtualenv: venv

defer pluginCtx.Close()

if err := installPolicyPackDependencies(pluginCtx, proj, projPath, root); err != nil {
if err := installPolicyPackDependencies(pluginCtx, proj, pwd); err != nil {
return err
}
}
Expand All @@ -252,15 +252,15 @@ virtualenv: venv
}

func installPolicyPackDependencies(ctx *plugin.Context,
proj *workspace.PolicyPackProject, projPath, root string) error {
proj *workspace.PolicyPackProject, directory string) error {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := ctx.Host.LanguageRuntime(proj.Runtime.Name())
lang, err := ctx.Host.LanguageRuntime(ctx.Root, ctx.Pwd, proj.Runtime.Name(), proj.Runtime.Options())
if err != nil {
return fmt.Errorf("failed to load language plugin %s: %w", proj.Runtime.Name(), err)
}

if err = lang.InstallDependencies(root); err != nil {
if err = lang.InstallDependencies(directory); err != nil {
return fmt.Errorf("installing dependencies failed; rerun manually to try again, "+
"then run `pulumi up` to perform an initial deployment: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/engine/lifecycletest/pulumi_test.go
Expand Up @@ -2983,6 +2983,12 @@ func (ctx *updateContext) GetProgramDependencies(
return nil, status.Errorf(codes.Unimplemented, "method GetProgramDependencies not implemented")
}

func (ctx *updateContext) RunPlugin(
req *pulumirpc.RunPluginRequest,
server pulumirpc.LanguageRuntime_RunPluginServer) error {
return status.Errorf(codes.Unimplemented, "method RunPlugin not implemented")
}

func TestLanguageClient(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 2 additions & 1 deletion pkg/engine/plugin_host.go
Expand Up @@ -44,7 +44,8 @@ func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host,
}, nil
}

func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
func (host *clientLanguageRuntimeHost) LanguageRuntime(
root, pwd, runtime string, options map[string]interface{}) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/plugins.go
Expand Up @@ -119,7 +119,7 @@ func newPluginSet(plugins ...workspace.PluginSpec) pluginSet {
func gatherPluginsFromProgram(plugctx *plugin.Context, prog plugin.ProgInfo) (pluginSet, error) {
logging.V(preparePluginLog).Infof("gatherPluginsFromProgram(): gathering plugins from language host")
set := newPluginSet()
langhostPlugins, err := plugin.GetRequiredPlugins(plugctx.Host, prog, plugin.AllPlugins)
langhostPlugins, err := plugin.GetRequiredPlugins(plugctx.Host, plugctx.Root, prog, plugin.AllPlugins)
if err != nil {
return set, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/resource/deploy/deploytest/languageruntime.go
Expand Up @@ -16,6 +16,8 @@ package deploytest

import (
"context"
"fmt"
"io"

"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
Expand Down Expand Up @@ -78,3 +80,7 @@ func (p *languageRuntime) GetProgramDependencies(
info plugin.ProgInfo, transitiveDependencies bool) ([]plugin.DependencyInfo, error) {
return nil, nil
}

func (p *languageRuntime) RunPlugin(info plugin.RunPluginInfo) (io.Reader, io.Reader, context.CancelFunc, error) {
return nil, nil, nil, fmt.Errorf("inline plugins are not currently supported")
}
3 changes: 2 additions & 1 deletion pkg/resource/deploy/deploytest/pluginhost.go
Expand Up @@ -344,7 +344,8 @@ func (host *pluginHost) Provider(pkg tokens.Package, version *semver.Version) (p
return plug.(plugin.Provider), nil
}

func (host *pluginHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
func (host *pluginHost) LanguageRuntime(
root, pwd, runtime string, options map[string]interface{}) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/resource/deploy/providers/registry_test.go
Expand Up @@ -68,7 +68,8 @@ func (host *testPluginHost) Provider(pkg tokens.Package, version *semver.Version
func (host *testPluginHost) CloseProvider(provider plugin.Provider) error {
return host.closeProvider(provider)
}
func (host *testPluginHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
func (host *testPluginHost) LanguageRuntime(
root, pwd, runtime string, options map[string]interface{}) (plugin.LanguageRuntime, error) {
return nil, errors.New("unsupported")
}
func (host *testPluginHost) EnsurePlugins(plugins []workspace.PluginSpec, kinds plugin.Flags) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/resource/deploy/source_eval.go
Expand Up @@ -202,7 +202,8 @@ func (iter *evalSourceIterator) forkRun(opts Options, config map[config.Key]stri
// Next, launch the language plugin.
run := func() result.Result {
rt := iter.src.runinfo.Proj.Runtime.Name()
langhost, err := iter.src.plugctx.Host.LanguageRuntime(rt)
rtopts := iter.src.runinfo.Proj.Runtime.Options()
langhost, err := iter.src.plugctx.Host.LanguageRuntime(iter.src.plugctx.Root, iter.src.plugctx.Pwd, rt, rtopts)
if err != nil {
return result.FromError(fmt.Errorf("failed to launch language host %s: %w", rt, err))
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/resource/deploy/source_query.go
Expand Up @@ -142,7 +142,8 @@ func (src *querySource) forkRun() {

func runLangPlugin(src *querySource) result.Result {
rt := src.runinfo.Proj.Runtime.Name()
langhost, err := src.plugctx.Host.LanguageRuntime(rt)
rtopts := src.runinfo.Proj.Runtime.Options()
langhost, err := src.plugctx.Host.LanguageRuntime(src.plugctx.Root, src.plugctx.Pwd, rt, rtopts)
if err != nil {
return result.FromError(fmt.Errorf("failed to launch language host %s: %w", rt, err))
}
Expand Down
2 changes: 1 addition & 1 deletion proto/.checksum.txt
Expand Up @@ -11,7 +11,7 @@
1949619858 9233 proto/pulumi/analyzer.proto
2889436496 3240 proto/pulumi/engine.proto
3421371250 793 proto/pulumi/errors.proto
3300935796 5024 proto/pulumi/language.proto
3818289820 5711 proto/pulumi/language.proto
2700626499 1743 proto/pulumi/plugin.proto
1451439690 19667 proto/pulumi/provider.proto
1325776472 11014 proto/pulumi/resource.proto
18 changes: 18 additions & 0 deletions proto/pulumi/language.proto
Expand Up @@ -39,6 +39,9 @@ service LanguageRuntime {

// GetProgramDependencies returns the set of dependencies required by the program.
rpc GetProgramDependencies(GetProgramDependenciesRequest) returns (GetProgramDependenciesResponse) {}

// RunPlugin executes a plugin program and returns its result asynchronously.
rpc RunPlugin(RunPluginRequest) returns (stream RunPluginResponse) {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frustrating that we can't have a tuple of immediate data and a trailing stream in a single RPC. The response API will look really funny if we want to add a message type for communicating a structured failure starting the plugin outside of the context of running it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeh I really wish grpc streams were a three type message of (immediate response message, many stream messages, final terminating message), but we work with what we've got.

}

// AboutResponse returns runtime information about the language.
Expand Down Expand Up @@ -109,4 +112,19 @@ message InstallDependenciesRequest {
message InstallDependenciesResponse {
bytes stdout = 1; // a line of stdout text.
bytes stderr = 2; // a line of stderr text.
}

message RunPluginRequest{
string pwd = 1; // the program's working directory.
string program = 2; // the path to the program to execute.
repeated string args = 3; // any arguments to pass to the program.
repeated string env = 4; // any environment variables to set as part of the program.
}

message RunPluginResponse {
oneof output {
bytes stdout = 1; // a line of stdout text.
bytes stderr = 2; // a line of stderr text.
int32 exitcode = 3; // the exit code of the provider.
}
}
7 changes: 6 additions & 1 deletion sdk/dotnet/cmd/pulumi-language-dotnet/main.go
Expand Up @@ -690,7 +690,7 @@ func (host *dotnetLanguageHost) GetPluginInfo(ctx context.Context, req *pbempty.
func (host *dotnetLanguageHost) InstallDependencies(
req *pulumirpc.InstallDependenciesRequest, server pulumirpc.LanguageRuntime_InstallDependenciesServer) error {

closer, stdout, stderr, err := rpcutil.MakeStreams(server, req.IsTerminal)
closer, stdout, stderr, err := rpcutil.MakeInstallDependenciesStreams(server, req.IsTerminal)
if err != nil {
return err
}
Expand Down Expand Up @@ -807,3 +807,8 @@ func (host *dotnetLanguageHost) GetProgramDependencies(
Dependencies: packages,
}, nil
}

func (host *dotnetLanguageHost) RunPlugin(
req *pulumirpc.RunPluginRequest, server pulumirpc.LanguageRuntime_RunPluginServer) error {
return errors.New("not supported")
}
5 changes: 5 additions & 0 deletions sdk/go/auto/stack.go
Expand Up @@ -1184,6 +1184,11 @@ func (s *languageRuntimeServer) GetProgramDependencies(
return nil, status.Errorf(codes.Unimplemented, "method GetProgramDependencies not implemented")
}

func (s *languageRuntimeServer) RunPlugin(
_ *pulumirpc.RunPluginRequest, _ pulumirpc.LanguageRuntime_RunPluginServer) error {
return status.Errorf(codes.Unimplemented, "method RunPlugin not implemented")
}

type fileWatcher struct {
Filename string
tail *tail.Tail
Expand Down
7 changes: 3 additions & 4 deletions sdk/go/common/resource/plugin/analyzer_plugin.go
Expand Up @@ -67,7 +67,7 @@ func NewAnalyzer(host Host, ctx *Context, name tokens.QName) (Analyzer, error) {
dialOpts := rpcutil.OpenTracingInterceptorDialOptions()

plug, err := newPlugin(ctx, ctx.Pwd, path, fmt.Sprintf("%v (analyzer)", name),
[]string{host.ServerAddr(), ctx.Pwd}, nil /*env*/, dialOpts)
workspace.AnalyzerPlugin, []string{host.ServerAddr(), ctx.Pwd}, nil /*env*/, dialOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -130,9 +130,8 @@ func NewPolicyAnalyzer(
}
}

plug, err := newPlugin(ctx, pwd, pluginPath, fmt.Sprintf("%v (analyzer)", name), args, env,
analyzerPluginDialOptions(ctx, fmt.Sprintf("%v", name)))

plug, err := newPlugin(ctx, pwd, pluginPath, fmt.Sprintf("%v (analyzer)", name),
workspace.AnalyzerPlugin, args, env, analyzerPluginDialOptions(ctx, fmt.Sprintf("%v", name)))
if err != nil {
// The original error might have been wrapped before being returned from newPlugin. So we look for
// the root cause of the error. This won't work if we switch to Go 1.13's new approach to wrapping.
Expand Down
38 changes: 28 additions & 10 deletions sdk/go/common/resource/plugin/host.go
Expand Up @@ -15,6 +15,7 @@
package plugin

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -69,7 +70,7 @@ type Host interface {
CloseProvider(provider Provider) error
// LanguageRuntime fetches the language runtime plugin for a given language, lazily allocating if necessary. If
// an implementation of this language runtime wasn't found, on an error occurs, a non-nil error is returned.
LanguageRuntime(runtime string) (LanguageRuntime, error)
LanguageRuntime(root, pwd, runtime string, options map[string]interface{}) (LanguageRuntime, error)

// EnsurePlugins ensures all plugins in the given array are loaded and ready to use. If any plugins are missing,
// and/or there are errors loading one or more plugins, a non-nil error is returned.
Expand Down Expand Up @@ -201,8 +202,10 @@ type pluginLoadRequest struct {
}

type defaultHost struct {
ctx *Context // the shared context for this host.
runtimeOptions map[string]interface{} // options to pass to the language plugins.
ctx *Context // the shared context for this host.

// the runtime options for the project, passed to resource providers to support dynamic providers.
runtimeOptions map[string]interface{}
analyzerPlugins map[tokens.QName]*analyzerPlugin // a cache of analyzer plugins and their processes.
languagePlugins map[string]*languagePlugin // a cache of language plugins and their processes.
resourcePlugins map[Provider]*resourcePlugin // the set of loaded resource plugins.
Expand Down Expand Up @@ -371,25 +374,35 @@ func (host *defaultHost) Provider(pkg tokens.Package, version *semver.Version) (
return plugin.(Provider), nil
}

func (host *defaultHost) LanguageRuntime(runtime string) (LanguageRuntime, error) {
func (host *defaultHost) LanguageRuntime(root, pwd, runtime string,
options map[string]interface{}) (LanguageRuntime, error) {
// Language runtimes use their own loading channel not the main one
plugin, err := loadPlugin(host.languageLoadRequests, func() (interface{}, error) {

// Key our cached runtime plugins by the runtime name and the options
jsonOptions, err := json.Marshal(options)
if err != nil {
return nil, fmt.Errorf("could not marshal runtime options to JSON: %w", err)
}

key := runtime + ":" + root + ":" + pwd + ":" + string(jsonOptions)

// First see if we already loaded this plugin.
if plug, has := host.languagePlugins[runtime]; has {
if plug, has := host.languagePlugins[key]; has {
contract.Assert(plug != nil)
return plug.Plugin, nil
}

// If not, allocate a new one.
plug, err := NewLanguageRuntime(host, host.ctx, runtime, host.runtimeOptions)
plug, err := NewLanguageRuntime(host, host.ctx, root, pwd, runtime, options)
if err == nil && plug != nil {
info, infoerr := plug.GetPluginInfo()
if infoerr != nil {
return nil, infoerr
}

// Memoize the result.
host.languagePlugins[runtime] = &languagePlugin{Plugin: plug, Info: info}
host.languagePlugins[key] = &languagePlugin{Plugin: plug, Info: info}
}

return plug, err
Expand All @@ -416,7 +429,12 @@ func (host *defaultHost) EnsurePlugins(plugins []workspace.PluginSpec, kinds Fla
}
case workspace.LanguagePlugin:
if kinds&LanguagePlugins != 0 {
if _, err := host.LanguageRuntime(plugin.Name); err != nil {
// Pass nil options here, we just need to check the language plugin is loadable. We can't use
// host.runtimePlugins because there might be other language plugins reported here (e.g
// shimless multi-language providers). Pass the host root for the plugin directory, it
// shouldn't matter because we're starting with no options but it's a directory we've already
// got hold of.
if _, err := host.LanguageRuntime(host.ctx.Root, host.ctx.Pwd, plugin.Name, nil); err != nil {
result = multierror.Append(result,
errors.Wrapf(err, "failed to load language plugin %s", plugin.Name))
}
Expand Down Expand Up @@ -544,13 +562,13 @@ const (
var AllPlugins = AnalyzerPlugins | LanguagePlugins | ResourcePlugins

// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
func GetRequiredPlugins(host Host, info ProgInfo, kinds Flags) ([]workspace.PluginSpec, error) {
func GetRequiredPlugins(host Host, root string, info ProgInfo, kinds Flags) ([]workspace.PluginSpec, error) {
var plugins []workspace.PluginSpec

if kinds&LanguagePlugins != 0 {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := host.LanguageRuntime(info.Proj.Runtime.Name())
lang, err := host.LanguageRuntime(root, info.Pwd, info.Proj.Runtime.Name(), info.Proj.Runtime.Options())
if err != nil {
return nil, errors.Wrapf(err, "failed to load language plugin %s", info.Proj.Runtime.Name())
}
Expand Down
11 changes: 11 additions & 0 deletions sdk/go/common/resource/plugin/langruntime.go
Expand Up @@ -15,6 +15,7 @@
package plugin

import (
"context"
"io"

"github.com/blang/semver"
Expand Down Expand Up @@ -48,6 +49,9 @@ type LanguageRuntime interface {

// GetProgramDependencies returns information about the dependencies for the given program.
GetProgramDependencies(info ProgInfo, transitiveDependencies bool) ([]DependencyInfo, error)

// RunPlugin executes a plugin program and returns its result asynchronously.
RunPlugin(info RunPluginInfo) (io.Reader, io.Reader, context.CancelFunc, error)
}

type DependencyInfo struct {
Expand All @@ -61,6 +65,13 @@ type AboutInfo struct {
Metadata map[string]string
}

type RunPluginInfo struct {
Pwd string
Program string
Args []string
Env []string
}

// ProgInfo contains minimal information about the program to be run.
type ProgInfo struct {
Proj *workspace.Project // the program project/package.
Expand Down