Skip to content

Commit

Permalink
Merge #10916
Browse files Browse the repository at this point in the history
10916: Engine and Golang support for shimless providers r=Frassle a=Frassle

<!--- 
Thanks so much for your contribution! If this is your first time contributing, please ensure that you have read the [CONTRIBUTING](https://github.com/pulumi/pulumi/blob/master/CONTRIBUTING.md) documentation.
-->

# Description

<!--- Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. -->

This allows the pulumi-language-go plugin to start up providers directly from .go source files.

The other language providers will be extended to support this as well in time.

## Checklist

<!--- Please provide details if the checkbox below is to be left unchecked. -->
- [x] I have added tests that prove my fix is effective or that my feature works
<!--- 
User-facing changes require a CHANGELOG entry.
-->
- [x] I have run `make changelog` and committed the `changelog/pending/<file>` documenting my change
<!--
If the change(s) in this PR is a modification of an existing call to the Pulumi Service,
then the service should honor older versions of the CLI where this change would not exist.
You must then bump the API version in /pkg/backend/httpstate/client/api.go, as well as add
it to the service.
-->
- [ ] Yes, there are changes in this PR that warrants bumping the Pulumi Service API version
  <!-- `@Pulumi` employees: If yes, you must submit corresponding changes in the service repo. -->


Co-authored-by: Fraser Waters <fraser@pulumi.com>
  • Loading branch information
bors[bot] and Frassle committed Nov 17, 2022
2 parents 982656f + 9e5f1cc commit 3e98035
Show file tree
Hide file tree
Showing 50 changed files with 1,488 additions and 171 deletions.
4 changes: 4 additions & 0 deletions changelog/pending/20221004--engine--shimless.yaml
@@ -0,0 +1,4 @@
changes:
- type: feat
scope: engine
description: Engine and Golang support for language plugins starting providers directly.
4 changes: 2 additions & 2 deletions pkg/cmd/pulumi/about.go
Expand Up @@ -140,7 +140,7 @@ func getSummaryAbout(ctx context.Context, transitiveDependencies bool, selectedS
result.Plugins = plugins
}

lang, err := pluginContext.Host.LanguageRuntime(proj.Runtime.Name())
lang, err := pluginContext.Host.LanguageRuntime(projinfo.Root, pwd, proj.Runtime.Name(), proj.Runtime.Options())
if err != nil {
addError(err, fmt.Sprintf("Failed to load language plugin %s", proj.Runtime.Name()))
} else {
Expand Down Expand Up @@ -557,7 +557,7 @@ func getProjectPluginsSilently(
defer func() { os.Stdout = stdout }()
os.Stdout = w

return plugin.GetRequiredPlugins(ctx.Host, plugin.ProgInfo{
return plugin.GetRequiredPlugins(ctx.Host, ctx.Root, plugin.ProgInfo{
Proj: proj,
Pwd: pwd,
Program: main,
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/pulumi/new.go
Expand Up @@ -695,7 +695,7 @@ func saveConfig(stack backend.Stack, c config.Map) error {
func installDependencies(ctx *plugin.Context, runtime *workspace.ProjectRuntimeInfo, directory string) error {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := ctx.Host.LanguageRuntime(runtime.Name())
lang, err := ctx.Host.LanguageRuntime(ctx.Root, ctx.Pwd, runtime.Name(), runtime.Options())
if err != nil {
return fmt.Errorf("failed to load language plugin %s: %w", runtime.Name(), err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/pulumi/plugin.go
Expand Up @@ -67,7 +67,7 @@ func getProjectPlugins() ([]workspace.PluginSpec, error) {

// Get the required plugins and then ensure they have metadata populated about them. Because it's possible
// a plugin required by the project hasn't yet been installed, we will simply skip any errors we encounter.
plugins, err := plugin.GetRequiredPlugins(ctx.Host, plugin.ProgInfo{
plugins, err := plugin.GetRequiredPlugins(ctx.Host, ctx.Root, plugin.ProgInfo{
Proj: proj,
Pwd: pwd,
Program: main,
Expand Down
10 changes: 5 additions & 5 deletions pkg/cmd/pulumi/policy_new.go
Expand Up @@ -221,7 +221,7 @@ virtualenv: venv
projinfo := &engine.Projinfo{Proj: &workspace.Project{
Main: proj.Main,
Runtime: proj.Runtime}, Root: root}
_, _, pluginCtx, err := engine.ProjectInfoContext(
pwd, _, pluginCtx, err := engine.ProjectInfoContext(
projinfo,
nil,
cmdutil.Diag(),
Expand All @@ -235,7 +235,7 @@ virtualenv: venv

defer pluginCtx.Close()

if err := installPolicyPackDependencies(pluginCtx, proj, projPath, root); err != nil {
if err := installPolicyPackDependencies(pluginCtx, proj, pwd); err != nil {
return err
}
}
Expand All @@ -252,15 +252,15 @@ virtualenv: venv
}

func installPolicyPackDependencies(ctx *plugin.Context,
proj *workspace.PolicyPackProject, projPath, root string) error {
proj *workspace.PolicyPackProject, directory string) error {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := ctx.Host.LanguageRuntime(proj.Runtime.Name())
lang, err := ctx.Host.LanguageRuntime(ctx.Root, ctx.Pwd, proj.Runtime.Name(), proj.Runtime.Options())
if err != nil {
return fmt.Errorf("failed to load language plugin %s: %w", proj.Runtime.Name(), err)
}

if err = lang.InstallDependencies(root); err != nil {
if err = lang.InstallDependencies(directory); err != nil {
return fmt.Errorf("installing dependencies failed; rerun manually to try again, "+
"then run `pulumi up` to perform an initial deployment: %w", err)
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/engine/lifecycletest/pulumi_test.go
Expand Up @@ -2983,6 +2983,12 @@ func (ctx *updateContext) GetProgramDependencies(
return nil, status.Errorf(codes.Unimplemented, "method GetProgramDependencies not implemented")
}

func (ctx *updateContext) RunPlugin(
req *pulumirpc.RunPluginRequest,
server pulumirpc.LanguageRuntime_RunPluginServer) error {
return status.Errorf(codes.Unimplemented, "method RunPlugin not implemented")
}

func TestLanguageClient(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 2 additions & 1 deletion pkg/engine/plugin_host.go
Expand Up @@ -44,7 +44,8 @@ func connectToLanguageRuntime(ctx *plugin.Context, address string) (plugin.Host,
}, nil
}

func (host *clientLanguageRuntimeHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
func (host *clientLanguageRuntimeHost) LanguageRuntime(
root, pwd, runtime string, options map[string]interface{}) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/engine/plugins.go
Expand Up @@ -119,7 +119,7 @@ func newPluginSet(plugins ...workspace.PluginSpec) pluginSet {
func gatherPluginsFromProgram(plugctx *plugin.Context, prog plugin.ProgInfo) (pluginSet, error) {
logging.V(preparePluginLog).Infof("gatherPluginsFromProgram(): gathering plugins from language host")
set := newPluginSet()
langhostPlugins, err := plugin.GetRequiredPlugins(plugctx.Host, prog, plugin.AllPlugins)
langhostPlugins, err := plugin.GetRequiredPlugins(plugctx.Host, plugctx.Root, prog, plugin.AllPlugins)
if err != nil {
return set, err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/resource/deploy/deploytest/languageruntime.go
Expand Up @@ -16,6 +16,8 @@ package deploytest

import (
"context"
"fmt"
"io"

"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
Expand Down Expand Up @@ -78,3 +80,7 @@ func (p *languageRuntime) GetProgramDependencies(
info plugin.ProgInfo, transitiveDependencies bool) ([]plugin.DependencyInfo, error) {
return nil, nil
}

func (p *languageRuntime) RunPlugin(info plugin.RunPluginInfo) (io.Reader, io.Reader, context.CancelFunc, error) {
return nil, nil, nil, fmt.Errorf("inline plugins are not currently supported")
}
3 changes: 2 additions & 1 deletion pkg/resource/deploy/deploytest/pluginhost.go
Expand Up @@ -344,7 +344,8 @@ func (host *pluginHost) Provider(pkg tokens.Package, version *semver.Version) (p
return plug.(plugin.Provider), nil
}

func (host *pluginHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
func (host *pluginHost) LanguageRuntime(
root, pwd, runtime string, options map[string]interface{}) (plugin.LanguageRuntime, error) {
return host.languageRuntime, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/resource/deploy/providers/registry_test.go
Expand Up @@ -68,7 +68,8 @@ func (host *testPluginHost) Provider(pkg tokens.Package, version *semver.Version
func (host *testPluginHost) CloseProvider(provider plugin.Provider) error {
return host.closeProvider(provider)
}
func (host *testPluginHost) LanguageRuntime(runtime string) (plugin.LanguageRuntime, error) {
func (host *testPluginHost) LanguageRuntime(
root, pwd, runtime string, options map[string]interface{}) (plugin.LanguageRuntime, error) {
return nil, errors.New("unsupported")
}
func (host *testPluginHost) EnsurePlugins(plugins []workspace.PluginSpec, kinds plugin.Flags) error {
Expand Down
3 changes: 2 additions & 1 deletion pkg/resource/deploy/source_eval.go
Expand Up @@ -202,7 +202,8 @@ func (iter *evalSourceIterator) forkRun(opts Options, config map[config.Key]stri
// Next, launch the language plugin.
run := func() result.Result {
rt := iter.src.runinfo.Proj.Runtime.Name()
langhost, err := iter.src.plugctx.Host.LanguageRuntime(rt)
rtopts := iter.src.runinfo.Proj.Runtime.Options()
langhost, err := iter.src.plugctx.Host.LanguageRuntime(iter.src.plugctx.Root, iter.src.plugctx.Pwd, rt, rtopts)
if err != nil {
return result.FromError(fmt.Errorf("failed to launch language host %s: %w", rt, err))
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/resource/deploy/source_query.go
Expand Up @@ -142,7 +142,8 @@ func (src *querySource) forkRun() {

func runLangPlugin(src *querySource) result.Result {
rt := src.runinfo.Proj.Runtime.Name()
langhost, err := src.plugctx.Host.LanguageRuntime(rt)
rtopts := src.runinfo.Proj.Runtime.Options()
langhost, err := src.plugctx.Host.LanguageRuntime(src.plugctx.Root, src.plugctx.Pwd, rt, rtopts)
if err != nil {
return result.FromError(fmt.Errorf("failed to launch language host %s: %w", rt, err))
}
Expand Down
2 changes: 1 addition & 1 deletion proto/.checksum.txt
Expand Up @@ -11,7 +11,7 @@
1949619858 9233 proto/pulumi/analyzer.proto
2889436496 3240 proto/pulumi/engine.proto
3421371250 793 proto/pulumi/errors.proto
3300935796 5024 proto/pulumi/language.proto
3818289820 5711 proto/pulumi/language.proto
2700626499 1743 proto/pulumi/plugin.proto
1451439690 19667 proto/pulumi/provider.proto
1325776472 11014 proto/pulumi/resource.proto
18 changes: 18 additions & 0 deletions proto/pulumi/language.proto
Expand Up @@ -39,6 +39,9 @@ service LanguageRuntime {

// GetProgramDependencies returns the set of dependencies required by the program.
rpc GetProgramDependencies(GetProgramDependenciesRequest) returns (GetProgramDependenciesResponse) {}

// RunPlugin executes a plugin program and returns its result asynchronously.
rpc RunPlugin(RunPluginRequest) returns (stream RunPluginResponse) {}
}

// AboutResponse returns runtime information about the language.
Expand Down Expand Up @@ -109,4 +112,19 @@ message InstallDependenciesRequest {
message InstallDependenciesResponse {
bytes stdout = 1; // a line of stdout text.
bytes stderr = 2; // a line of stderr text.
}

message RunPluginRequest{
string pwd = 1; // the program's working directory.
string program = 2; // the path to the program to execute.
repeated string args = 3; // any arguments to pass to the program.
repeated string env = 4; // any environment variables to set as part of the program.
}

message RunPluginResponse {
oneof output {
bytes stdout = 1; // a line of stdout text.
bytes stderr = 2; // a line of stderr text.
int32 exitcode = 3; // the exit code of the provider.
}
}
7 changes: 6 additions & 1 deletion sdk/dotnet/cmd/pulumi-language-dotnet/main.go
Expand Up @@ -690,7 +690,7 @@ func (host *dotnetLanguageHost) GetPluginInfo(ctx context.Context, req *pbempty.
func (host *dotnetLanguageHost) InstallDependencies(
req *pulumirpc.InstallDependenciesRequest, server pulumirpc.LanguageRuntime_InstallDependenciesServer) error {

closer, stdout, stderr, err := rpcutil.MakeStreams(server, req.IsTerminal)
closer, stdout, stderr, err := rpcutil.MakeInstallDependenciesStreams(server, req.IsTerminal)
if err != nil {
return err
}
Expand Down Expand Up @@ -807,3 +807,8 @@ func (host *dotnetLanguageHost) GetProgramDependencies(
Dependencies: packages,
}, nil
}

func (host *dotnetLanguageHost) RunPlugin(
req *pulumirpc.RunPluginRequest, server pulumirpc.LanguageRuntime_RunPluginServer) error {
return errors.New("not supported")
}
5 changes: 5 additions & 0 deletions sdk/go/auto/stack.go
Expand Up @@ -1184,6 +1184,11 @@ func (s *languageRuntimeServer) GetProgramDependencies(
return nil, status.Errorf(codes.Unimplemented, "method GetProgramDependencies not implemented")
}

func (s *languageRuntimeServer) RunPlugin(
_ *pulumirpc.RunPluginRequest, _ pulumirpc.LanguageRuntime_RunPluginServer) error {
return status.Errorf(codes.Unimplemented, "method RunPlugin not implemented")
}

type fileWatcher struct {
Filename string
tail *tail.Tail
Expand Down
7 changes: 3 additions & 4 deletions sdk/go/common/resource/plugin/analyzer_plugin.go
Expand Up @@ -67,7 +67,7 @@ func NewAnalyzer(host Host, ctx *Context, name tokens.QName) (Analyzer, error) {
dialOpts := rpcutil.OpenTracingInterceptorDialOptions()

plug, err := newPlugin(ctx, ctx.Pwd, path, fmt.Sprintf("%v (analyzer)", name),
[]string{host.ServerAddr(), ctx.Pwd}, nil /*env*/, dialOpts)
workspace.AnalyzerPlugin, []string{host.ServerAddr(), ctx.Pwd}, nil /*env*/, dialOpts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -130,9 +130,8 @@ func NewPolicyAnalyzer(
}
}

plug, err := newPlugin(ctx, pwd, pluginPath, fmt.Sprintf("%v (analyzer)", name), args, env,
analyzerPluginDialOptions(ctx, fmt.Sprintf("%v", name)))

plug, err := newPlugin(ctx, pwd, pluginPath, fmt.Sprintf("%v (analyzer)", name),
workspace.AnalyzerPlugin, args, env, analyzerPluginDialOptions(ctx, fmt.Sprintf("%v", name)))
if err != nil {
// The original error might have been wrapped before being returned from newPlugin. So we look for
// the root cause of the error. This won't work if we switch to Go 1.13's new approach to wrapping.
Expand Down
38 changes: 28 additions & 10 deletions sdk/go/common/resource/plugin/host.go
Expand Up @@ -15,6 +15,7 @@
package plugin

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -69,7 +70,7 @@ type Host interface {
CloseProvider(provider Provider) error
// LanguageRuntime fetches the language runtime plugin for a given language, lazily allocating if necessary. If
// an implementation of this language runtime wasn't found, on an error occurs, a non-nil error is returned.
LanguageRuntime(runtime string) (LanguageRuntime, error)
LanguageRuntime(root, pwd, runtime string, options map[string]interface{}) (LanguageRuntime, error)

// EnsurePlugins ensures all plugins in the given array are loaded and ready to use. If any plugins are missing,
// and/or there are errors loading one or more plugins, a non-nil error is returned.
Expand Down Expand Up @@ -201,8 +202,10 @@ type pluginLoadRequest struct {
}

type defaultHost struct {
ctx *Context // the shared context for this host.
runtimeOptions map[string]interface{} // options to pass to the language plugins.
ctx *Context // the shared context for this host.

// the runtime options for the project, passed to resource providers to support dynamic providers.
runtimeOptions map[string]interface{}
analyzerPlugins map[tokens.QName]*analyzerPlugin // a cache of analyzer plugins and their processes.
languagePlugins map[string]*languagePlugin // a cache of language plugins and their processes.
resourcePlugins map[Provider]*resourcePlugin // the set of loaded resource plugins.
Expand Down Expand Up @@ -371,25 +374,35 @@ func (host *defaultHost) Provider(pkg tokens.Package, version *semver.Version) (
return plugin.(Provider), nil
}

func (host *defaultHost) LanguageRuntime(runtime string) (LanguageRuntime, error) {
func (host *defaultHost) LanguageRuntime(root, pwd, runtime string,
options map[string]interface{}) (LanguageRuntime, error) {
// Language runtimes use their own loading channel not the main one
plugin, err := loadPlugin(host.languageLoadRequests, func() (interface{}, error) {

// Key our cached runtime plugins by the runtime name and the options
jsonOptions, err := json.Marshal(options)
if err != nil {
return nil, fmt.Errorf("could not marshal runtime options to JSON: %w", err)
}

key := runtime + ":" + root + ":" + pwd + ":" + string(jsonOptions)

// First see if we already loaded this plugin.
if plug, has := host.languagePlugins[runtime]; has {
if plug, has := host.languagePlugins[key]; has {
contract.Assert(plug != nil)
return plug.Plugin, nil
}

// If not, allocate a new one.
plug, err := NewLanguageRuntime(host, host.ctx, runtime, host.runtimeOptions)
plug, err := NewLanguageRuntime(host, host.ctx, root, pwd, runtime, options)
if err == nil && plug != nil {
info, infoerr := plug.GetPluginInfo()
if infoerr != nil {
return nil, infoerr
}

// Memoize the result.
host.languagePlugins[runtime] = &languagePlugin{Plugin: plug, Info: info}
host.languagePlugins[key] = &languagePlugin{Plugin: plug, Info: info}
}

return plug, err
Expand All @@ -416,7 +429,12 @@ func (host *defaultHost) EnsurePlugins(plugins []workspace.PluginSpec, kinds Fla
}
case workspace.LanguagePlugin:
if kinds&LanguagePlugins != 0 {
if _, err := host.LanguageRuntime(plugin.Name); err != nil {
// Pass nil options here, we just need to check the language plugin is loadable. We can't use
// host.runtimePlugins because there might be other language plugins reported here (e.g
// shimless multi-language providers). Pass the host root for the plugin directory, it
// shouldn't matter because we're starting with no options but it's a directory we've already
// got hold of.
if _, err := host.LanguageRuntime(host.ctx.Root, host.ctx.Pwd, plugin.Name, nil); err != nil {
result = multierror.Append(result,
errors.Wrapf(err, "failed to load language plugin %s", plugin.Name))
}
Expand Down Expand Up @@ -544,13 +562,13 @@ const (
var AllPlugins = AnalyzerPlugins | LanguagePlugins | ResourcePlugins

// GetRequiredPlugins lists a full set of plugins that will be required by the given program.
func GetRequiredPlugins(host Host, info ProgInfo, kinds Flags) ([]workspace.PluginSpec, error) {
func GetRequiredPlugins(host Host, root string, info ProgInfo, kinds Flags) ([]workspace.PluginSpec, error) {
var plugins []workspace.PluginSpec

if kinds&LanguagePlugins != 0 {
// First make sure the language plugin is present. We need this to load the required resource plugins.
// TODO: we need to think about how best to version this. For now, it always picks the latest.
lang, err := host.LanguageRuntime(info.Proj.Runtime.Name())
lang, err := host.LanguageRuntime(root, info.Pwd, info.Proj.Runtime.Name(), info.Proj.Runtime.Options())
if err != nil {
return nil, errors.Wrapf(err, "failed to load language plugin %s", info.Proj.Runtime.Name())
}
Expand Down
11 changes: 11 additions & 0 deletions sdk/go/common/resource/plugin/langruntime.go
Expand Up @@ -15,6 +15,7 @@
package plugin

import (
"context"
"io"

"github.com/blang/semver"
Expand Down Expand Up @@ -48,6 +49,9 @@ type LanguageRuntime interface {

// GetProgramDependencies returns information about the dependencies for the given program.
GetProgramDependencies(info ProgInfo, transitiveDependencies bool) ([]DependencyInfo, error)

// RunPlugin executes a plugin program and returns its result asynchronously.
RunPlugin(info RunPluginInfo) (io.Reader, io.Reader, context.CancelFunc, error)
}

type DependencyInfo struct {
Expand All @@ -61,6 +65,13 @@ type AboutInfo struct {
Metadata map[string]string
}

type RunPluginInfo struct {
Pwd string
Program string
Args []string
Env []string
}

// ProgInfo contains minimal information about the program to be run.
type ProgInfo struct {
Proj *workspace.Project // the program project/package.
Expand Down

0 comments on commit 3e98035

Please sign in to comment.