Skip to content

Commit

Permalink
Engine and Golang support for shimless providers
Browse files Browse the repository at this point in the history
This allows the pulumi-language-go plugin to start up providers directly
from .go source files.

The other language providers will be extended to support this as well in
time.
  • Loading branch information
Frassle committed Oct 13, 2022
1 parent 40343a3 commit b19be5f
Show file tree
Hide file tree
Showing 48 changed files with 1,435 additions and 162 deletions.
4 changes: 4 additions & 0 deletions changelog/pending/20221004--engine--shimless.yaml
@@ -0,0 +1,4 @@
changes:
- type: feat
scope: engine
description: Engine and Golang support for language plugins starting providers directly.
4 changes: 2 additions & 2 deletions pkg/cmd/pulumi/about.go
Expand Up @@ -140,7 +140,7 @@ func getSummaryAbout(ctx context.Context, transitiveDependencies bool, selectedS
result.Plugins = plugins
}

lang, err := pluginContext.Host.LanguageRuntime(proj.Runtime.Name())
lang, err := pluginContext.Host.LanguageRuntime(projinfo.Root, pwd, proj.Runtime.Name(), proj.Runtime.Options())
if err != nil {
addError(err, fmt.Sprintf("Failed to load language plugin %s", proj.Runtime.Name()))
} else {
Expand Down Expand Up @@ -557,7 +557,7 @@ func getProjectPluginsSilently(
defer func() { os.Stdout = stdout }()
os.Stdout = w

return plugin.GetRequiredPlugins(ctx.Host, plugin.ProgInfo{
return plugin.GetRequiredPlugins(ctx.Host, ctx.Root, plugin.ProgInfo{
Proj: proj,
Pwd: pwd,
Program: main,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/pulumi/new.go
Expand Up @@ -690,7 +690,7 @@ func saveConfig(stack backend.Stack, c config.Map) error {
func installDependencies(ctx *plugin.Context, runtime *workspace.ProjectRuntimeInfo, directory string) error {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := ctx.Host.LanguageRuntime(runtime.Name())
lang, err := ctx.Host.LanguageRuntime(ctx.Root, ctx.Pwd, runtime.Name(), runtime.Options())
if err != nil {
return fmt.Errorf("failed to load language plugin %s: %w", runtime.Name(), err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/pulumi/plugin.go
Expand Up @@ -67,7 +67,7 @@ func getProjectPlugins() ([]workspace.PluginSpec, error) {

// Get the required plugins and then ensure they have metadata populated about them. Because it's possible
// a plugin required by the project hasn't yet been installed, we will simply skip any errors we encounter.
plugins, err := plugin.GetRequiredPlugins(ctx.Host, plugin.ProgInfo{
plugins, err := plugin.GetRequiredPlugins(ctx.Host, ctx.Root, plugin.ProgInfo{
Proj: proj,
Pwd: pwd,
Program: main,
Expand Down
10 changes: 5 additions & 5 deletions pkg/cmd/pulumi/policy_new.go
Expand Up @@ -221,7 +221,7 @@ virtualenv: venv
projinfo := &engine.Projinfo{Proj: &workspace.Project{
Main: proj.Main,
Runtime: proj.Runtime}, Root: root}
_, _, pluginCtx, err := engine.ProjectInfoContext(
pwd, _, pluginCtx, err := engine.ProjectInfoContext(
projinfo,
nil,
cmdutil.Diag(),
Expand All @@ -235,7 +235,7 @@ virtualenv: venv

defer pluginCtx.Close()

if err := installPolicyPackDependencies(pluginCtx, proj, projPath, root); err != nil {
if err := installPolicyPackDependencies(pluginCtx, proj, pwd); err != nil {
return err
}
}
Expand All @@ -252,15 +252,15 @@ virtualenv: venv
}

func installPolicyPackDependencies(ctx *plugin.Context,
proj *workspace.PolicyPackProject, projPath, root string) error {
proj *workspace.PolicyPackProject, directory string) error {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := ctx.Host.LanguageRuntime(proj.Runtime.Name())
lang, err := ctx.Host.LanguageRuntime(ctx.Root, ctx.Pwd, proj.Runtime.Name(), proj.Runtime.Options())
if err != nil {
return fmt.Errorf("failed to load language plugin %s: %w", proj.Runtime.Name(), err)
}

if err = lang.InstallDependencies(root); err != nil {
if err = lang.InstallDependencies(directory); err != nil {
return fmt.Errorf("installing dependencies failed; rerun manually to try again, "+
"then run `pulumi up` to perform an initial deployment: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/engine/lifecycletest/pulumi_test.go
Expand Up @@ -2983,6 +2983,12 @@ func (ctx *updateContext) GetProgramDependencies(
return nil, status.Errorf(codes.Unimplemented, "method GetProgramDependencies not implemented")
}

func (ctx *updateContext) RunPlugin(
req *pulumirpc.RunPluginRequest,
server pulumirpc.LanguageRuntime_RunPluginServer) error {
return status.Errorf(codes.Unimplemented, "method RunPlugin not implemented")
}

func TestLanguageClient(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 2 additions & 1 deletion pkg/engine/plugin_host.go
Expand Up @@ -47,6 +47,7 @@ func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host,
}, nil
}

func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
func (host *clientLanguageRuntimeHost) LanguageRuntime(
root, pwd, runtime string, options map[string]interface{}) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}
2 changes: 1 addition & 1 deletion pkg/engine/plugins.go
Expand Up @@ -119,7 +119,7 @@ func newPluginSet(plugins ...workspace.PluginSpec) pluginSet {
func gatherPluginsFromProgram(plugctx *plugin.Context, prog plugin.ProgInfo) (pluginSet, error) {
logging.V(preparePluginLog).Infof("gatherPluginsFromProgram(): gathering plugins from language host")
set := newPluginSet()
langhostPlugins, err := plugin.GetRequiredPlugins(plugctx.Host, prog, plugin.AllPlugins)
langhostPlugins, err := plugin.GetRequiredPlugins(plugctx.Host, plugctx.Root, prog, plugin.AllPlugins)
if err != nil {
return set, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/resource/deploy/deploytest/languageruntime.go
Expand Up @@ -16,6 +16,8 @@ package deploytest

import (
"context"
"fmt"
"io"

"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
Expand Down Expand Up @@ -78,3 +80,7 @@ func (p *languageRuntime) GetProgramDependencies(
info plugin.ProgInfo, transitiveDependencies bool) ([]plugin.DependencyInfo, error) {
return nil, nil
}

func (p *languageRuntime) RunPlugin(info plugin.RunPluginInfo) (io.Reader, io.Reader, context.CancelFunc, error) {
return nil, nil, nil, fmt.Errorf("inline plugins are not currently supported")
}
3 changes: 2 additions & 1 deletion pkg/resource/deploy/deploytest/pluginhost.go
Expand Up @@ -340,7 +340,8 @@ func (host *pluginHost) Provider(pkg tokens.Package, version *semver.Version) (p
return plug.(plugin.Provider), nil
}

func (host *pluginHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
func (host *pluginHost) LanguageRuntime(
root, pwd, runtime string, options map[string]interface{}) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/resource/deploy/providers/registry_test.go
Expand Up @@ -68,7 +68,8 @@ func (host *testPluginHost) Provider(pkg tokens.Package, version *semver.Version
func (host *testPluginHost) CloseProvider(provider plugin.Provider) error {
return host.closeProvider(provider)
}
func (host *testPluginHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
func (host *testPluginHost) LanguageRuntime(
root, pwd, runtime string, options map[string]interface{}) (plugin.LanguageRuntime, error) {
return nil, errors.New("unsupported")
}
func (host *testPluginHost) EnsurePlugins(plugins []workspace.PluginSpec, kinds plugin.Flags) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/resource/deploy/source_eval.go
Expand Up @@ -199,7 +199,8 @@ func (iter *evalSourceIterator) forkRun(opts Options, config map[config.Key]stri
// Next, launch the language plugin.
run := func() result.Result {
rt := iter.src.runinfo.Proj.Runtime.Name()
langhost, err := iter.src.plugctx.Host.LanguageRuntime(rt)
rtopts := iter.src.runinfo.Proj.Runtime.Options()
langhost, err := iter.src.plugctx.Host.LanguageRuntime(iter.src.plugctx.Root, iter.src.plugctx.Pwd, rt, rtopts)
if err != nil {
return result.FromError(fmt.Errorf("failed to launch language host %s: %w", rt, err))
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/resource/deploy/source_query.go
Expand Up @@ -142,7 +142,8 @@ func (src *querySource) forkRun() {

func runLangPlugin(src *querySource) result.Result {
rt := src.runinfo.Proj.Runtime.Name()
langhost, err := src.plugctx.Host.LanguageRuntime(rt)
rtopts := src.runinfo.Proj.Runtime.Options()
langhost, err := src.plugctx.Host.LanguageRuntime(src.plugctx.Root, src.plugctx.Pwd, rt, rtopts)
if err != nil {
return result.FromError(fmt.Errorf("failed to launch language host %s: %w", rt, err))
}
Expand Down
2 changes: 1 addition & 1 deletion proto/.checksum.txt
Expand Up @@ -9,7 +9,7 @@
1949619858 9233 proto/pulumi/analyzer.proto
2889436496 3240 proto/pulumi/engine.proto
3421371250 793 proto/pulumi/errors.proto
3300935796 5024 proto/pulumi/language.proto
3818289820 5711 proto/pulumi/language.proto
2700626499 1743 proto/pulumi/plugin.proto
1451439690 19667 proto/pulumi/provider.proto
3808155704 10824 proto/pulumi/resource.proto
18 changes: 18 additions & 0 deletions proto/pulumi/language.proto
Expand Up @@ -39,6 +39,9 @@ service LanguageRuntime {

// GetProgramDependencies returns the set of dependencies required by the program.
rpc GetProgramDependencies(GetProgramDependenciesRequest) returns (GetProgramDependenciesResponse) {}

// RunPlugin executes a plugin program and returns its result asynchronously.
rpc RunPlugin(RunPluginRequest) returns (stream RunPluginResponse) {}
}

// AboutResponse returns runtime information about the language.
Expand Down Expand Up @@ -109,4 +112,19 @@ message InstallDependenciesRequest {
message InstallDependenciesResponse {
bytes stdout = 1; // a line of stdout text.
bytes stderr = 2; // a line of stderr text.
}

message RunPluginRequest{
string pwd = 1; // the program's working directory.
string program = 2; // the path to the program to execute.
repeated string args = 3; // any arguments to pass to the program.
repeated string env = 4; // any environment variables to set as part of the program.
}

message RunPluginResponse {
oneof output {
bytes stdout = 1; // a line of stdout text.
bytes stderr = 2; // a line of stderr text.
int32 exitcode = 3; // the exit code of the provider.
}
}
7 changes: 6 additions & 1 deletion sdk/dotnet/cmd/pulumi-language-dotnet/main.go
Expand Up @@ -688,7 +688,7 @@ func (host *dotnetLanguageHost) GetPluginInfo(ctx context.Context, req *pbempty.
func (host *dotnetLanguageHost) InstallDependencies(
req *pulumirpc.InstallDependenciesRequest, server pulumirpc.LanguageRuntime_InstallDependenciesServer) error {

closer, stdout, stderr, err := rpcutil.MakeStreams(server, req.IsTerminal)
closer, stdout, stderr, err := rpcutil.MakeInstallDependenciesStreams(server, req.IsTerminal)
if err != nil {
return err
}
Expand Down Expand Up @@ -805,3 +805,8 @@ func (host *dotnetLanguageHost) GetProgramDependencies(
Dependencies: packages,
}, nil
}

func (host *dotnetLanguageHost) RunPlugin(
req *pulumirpc.RunPluginRequest, server pulumirpc.LanguageRuntime_RunPluginServer) error {
return errors.New("not supported")
}
5 changes: 5 additions & 0 deletions sdk/go/auto/stack.go
Expand Up @@ -1076,6 +1076,11 @@ func (s *languageRuntimeServer) GetProgramDependencies(
return nil, status.Errorf(codes.Unimplemented, "method GetProgramDependencies not implemented")
}

func (s *languageRuntimeServer) RunPlugin(
_ *pulumirpc.RunPluginRequest, _ pulumirpc.LanguageRuntime_RunPluginServer) error {
return status.Errorf(codes.Unimplemented, "method RunPlugin not implemented")
}

type fileWatcher struct {
Filename string
tail *tail.Tail
Expand Down
4 changes: 2 additions & 2 deletions sdk/go/common/resource/plugin/analyzer_plugin.go
Expand Up @@ -63,7 +63,7 @@ func NewAnalyzer(host Host, ctx *Context, name tokens.QName) (Analyzer, error) {
contract.Assert(path != "")

plug, err := newPlugin(ctx, ctx.Pwd, path, fmt.Sprintf("%v (analyzer)", name),
[]string{host.ServerAddr(), ctx.Pwd}, nil /*env*/)
workspace.AnalyzerPlugin, []string{host.ServerAddr(), ctx.Pwd}, nil /*env*/)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -126,7 +126,7 @@ func NewPolicyAnalyzer(
}
}

plug, err := newPlugin(ctx, pwd, pluginPath, fmt.Sprintf("%v (analyzer)", name), args, env)
plug, err := newPlugin(ctx, pwd, pluginPath, fmt.Sprintf("%v (analyzer)", name), workspace.AnalyzerPlugin, args, env)
if err != nil {
// The original error might have been wrapped before being returned from newPlugin. So we look for
// the root cause of the error. This won't work if we switch to Go 1.13's new approach to wrapping.
Expand Down
38 changes: 28 additions & 10 deletions sdk/go/common/resource/plugin/host.go
Expand Up @@ -15,6 +15,7 @@
package plugin

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -69,7 +70,7 @@ type Host interface {
CloseProvider(provider Provider) error
// LanguageRuntime fetches the language runtime plugin for a given language, lazily allocating if necessary. If
// an implementation of this language runtime wasn't found, on an error occurs, a non-nil error is returned.
LanguageRuntime(runtime string) (LanguageRuntime, error)
LanguageRuntime(root, pwd, runtime string, options map[string]interface{}) (LanguageRuntime, error)

// EnsurePlugins ensures all plugins in the given array are loaded and ready to use. If any plugins are missing,
// and/or there are errors loading one or more plugins, a non-nil error is returned.
Expand Down Expand Up @@ -198,8 +199,10 @@ type pluginLoadRequest struct {
}

type defaultHost struct {
ctx *Context // the shared context for this host.
runtimeOptions map[string]interface{} // options to pass to the language plugins.
ctx *Context // the shared context for this host.

// the runtime options for the project, passed to resource providers to support dynamic providers.
runtimeOptions map[string]interface{}
analyzerPlugins map[tokens.QName]*analyzerPlugin // a cache of analyzer plugins and their processes.
languagePlugins map[string]*languagePlugin // a cache of language plugins and their processes.
resourcePlugins map[Provider]*resourcePlugin // the set of loaded resource plugins.
Expand Down Expand Up @@ -368,25 +371,35 @@ func (host *defaultHost) Provider(pkg tokens.Package, version *semver.Version) (
return plugin.(Provider), nil
}

func (host *defaultHost) LanguageRuntime(runtime string) (LanguageRuntime, error) {
func (host *defaultHost) LanguageRuntime(root, pwd, runtime string,
options map[string]interface{}) (LanguageRuntime, error) {
// Language runtimes use their own loading channel not the main one
plugin, err := loadPlugin(host.languageLoadRequests, func() (interface{}, error) {

// Key our cached runtime plugins by the runtime name and the options
jsonOptions, err := json.Marshal(options)
if err != nil {
return nil, fmt.Errorf("could not marshal runtime options to JSON: %w", err)
}

key := runtime + ":" + root + ":" + pwd + ":" + string(jsonOptions)

// First see if we already loaded this plugin.
if plug, has := host.languagePlugins[runtime]; has {
if plug, has := host.languagePlugins[key]; has {
contract.Assert(plug != nil)
return plug.Plugin, nil
}

// If not, allocate a new one.
plug, err := NewLanguageRuntime(host, host.ctx, runtime, host.runtimeOptions)
plug, err := NewLanguageRuntime(host, host.ctx, root, pwd, runtime, options)
if err == nil && plug != nil {
info, infoerr := plug.GetPluginInfo()
if infoerr != nil {
return nil, infoerr
}

// Memoize the result.
host.languagePlugins[runtime] = &languagePlugin{Plugin: plug, Info: info}
host.languagePlugins[key] = &languagePlugin{Plugin: plug, Info: info}
}

return plug, err
Expand All @@ -413,7 +426,12 @@ func (host *defaultHost) EnsurePlugins(plugins []workspace.PluginSpec, kinds Fla
}
case workspace.LanguagePlugin:
if kinds&LanguagePlugins != 0 {
if _, err := host.LanguageRuntime(plugin.Name); err != nil {
// Pass nil options here, we just need to check the language plugin is loadable. We can't use
// host.runtimePlugins because there might be other language plugins reported here (e.g
// shimless multi-language providers). Pass the host root for the plugin directory, it
// shouldn't matter because we're starting with no options but it's a directory we've already
// got hold of.
if _, err := host.LanguageRuntime(host.ctx.Root, host.ctx.Pwd, plugin.Name, nil); err != nil {
result = multierror.Append(result,
errors.Wrapf(err, "failed to load language plugin %s", plugin.Name))
}
Expand Down Expand Up @@ -519,13 +537,13 @@ const (
var AllPlugins = AnalyzerPlugins | LanguagePlugins | ResourcePlugins

// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
func GetRequiredPlugins(host Host, info ProgInfo, kinds Flags) ([]workspace.PluginSpec, error) {
func GetRequiredPlugins(host Host, root string, info ProgInfo, kinds Flags) ([]workspace.PluginSpec, error) {
var plugins []workspace.PluginSpec

if kinds&LanguagePlugins != 0 {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := host.LanguageRuntime(info.Proj.Runtime.Name())
lang, err := host.LanguageRuntime(root, info.Pwd, info.Proj.Runtime.Name(), info.Proj.Runtime.Options())
if err != nil {
return nil, errors.Wrapf(err, "failed to load language plugin %s", info.Proj.Runtime.Name())
}
Expand Down
11 changes: 11 additions & 0 deletions sdk/go/common/resource/plugin/langruntime.go
Expand Up @@ -15,6 +15,7 @@
package plugin

import (
"context"
"io"

"github.com/blang/semver"
Expand Down Expand Up @@ -48,6 +49,9 @@ type LanguageRuntime interface {

// GetProgramDependencies returns information about the dependencies for the given program.
GetProgramDependencies(info ProgInfo, transitiveDependencies bool) ([]DependencyInfo, error)

// RunPlugin executes a plugin program and returns its result asynchronously.
RunPlugin(info RunPluginInfo) (io.Reader, io.Reader, context.CancelFunc, error)
}

type DependencyInfo struct {
Expand All @@ -61,6 +65,13 @@ type AboutInfo struct {
Metadata map[string]string
}

type RunPluginInfo struct {
Pwd string
Program string
Args []string
Env []string
}

// ProgInfo contains minimal information about the program to be run.
type ProgInfo struct {
Proj *workspace.Project // the program project/package.
Expand Down

0 comments on commit b19be5f

Please sign in to comment.