Skip to content

Commit

Permalink
Run plugins from language plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Frassle committed Sep 25, 2022
1 parent 255495e commit 2bbfa5b
Show file tree
Hide file tree
Showing 32 changed files with 1,349 additions and 125 deletions.
6 changes: 6 additions & 0 deletions pkg/engine/lifecycletest/pulumi_test.go
Expand Up @@ -2562,6 +2562,12 @@ func (ctx *updateContext) GetProgramDependencies(
return nil, status.Errorf(codes.Unimplemented, "method GetProgramDependencies not implemented")
}

func (ctx *updateContext) RunPlugin(
req *pulumirpc.RunPluginRequest,
server pulumirpc.LanguageRuntime_RunPluginServer) error {
return status.Errorf(codes.Unimplemented, "method RunPlugin not implemented")
}

func TestLanguageClient(t *testing.T) {
t.Parallel()

Expand Down
6 changes: 6 additions & 0 deletions pkg/resource/deploy/deploytest/languageruntime.go
Expand Up @@ -16,6 +16,8 @@ package deploytest

import (
"context"
"fmt"
"io"

"github.com/pulumi/pulumi/sdk/v3/go/common/resource/plugin"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
Expand Down Expand Up @@ -78,3 +80,7 @@ func (p *languageRuntime) GetProgramDependencies(
info plugin.ProgInfo, transitiveDependencies bool) ([]plugin.DependencyInfo, error) {
return nil, nil
}

func (p *languageRuntime) RunPlugin(info plugin.RunPluginInfo) (io.Reader, io.Reader, context.CancelFunc, error) {
return nil, nil, nil, fmt.Errorf("inline plugins are not currently supported")
}
2 changes: 1 addition & 1 deletion proto/.checksum.txt
Expand Up @@ -8,7 +8,7 @@
1949619858 9233 proto/pulumi/analyzer.proto
2889436496 3240 proto/pulumi/engine.proto
3421371250 793 proto/pulumi/errors.proto
3300935796 5024 proto/pulumi/language.proto
3818289820 5711 proto/pulumi/language.proto
2700626499 1743 proto/pulumi/plugin.proto
1451439690 19667 proto/pulumi/provider.proto
3448262075 10699 proto/pulumi/resource.proto
18 changes: 18 additions & 0 deletions proto/pulumi/language.proto
Expand Up @@ -39,6 +39,9 @@ service LanguageRuntime {

// GetProgramDependencies returns the set of dependencies required by the program.
rpc GetProgramDependencies(GetProgramDependenciesRequest) returns (GetProgramDependenciesResponse) {}

// RunPlugin executes a plugin program and returns its result asynchronously.
rpc RunPlugin(RunPluginRequest) returns (stream RunPluginResponse) {}
}

// AboutResponse returns runtime information about the language.
Expand Down Expand Up @@ -109,4 +112,19 @@ message InstallDependenciesRequest {
message InstallDependenciesResponse {
bytes stdout = 1; // a line of stdout text.
bytes stderr = 2; // a line of stderr text.
}

message RunPluginRequest{
string pwd = 1; // the program's working directory.
string program = 2; // the path to the program to execute.
repeated string args = 3; // any arguments to pass to the program.
repeated string env = 4; // any environment variables to set as part of the program.
}

message RunPluginResponse {
oneof output {
bytes stdout = 1; // a line of stdout text.
bytes stderr = 2; // a line of stderr text.
int32 exitcode = 3; // the exit code of the provider.
}
}
7 changes: 6 additions & 1 deletion sdk/dotnet/cmd/pulumi-language-dotnet/main.go
Expand Up @@ -688,7 +688,7 @@ func (host *dotnetLanguageHost) GetPluginInfo(ctx context.Context, req *pbempty.
func (host *dotnetLanguageHost) InstallDependencies(
req *pulumirpc.InstallDependenciesRequest, server pulumirpc.LanguageRuntime_InstallDependenciesServer) error {

closer, stdout, stderr, err := rpcutil.MakeStreams(server, req.IsTerminal)
closer, stdout, stderr, err := rpcutil.MakeInstallDependenciesStreams(server, req.IsTerminal)
if err != nil {
return err
}
Expand Down Expand Up @@ -805,3 +805,8 @@ func (host *dotnetLanguageHost) GetProgramDependencies(
Dependencies: packages,
}, nil
}

func (host *dotnetLanguageHost) RunPlugin(
req *pulumirpc.RunPluginRequest, server pulumirpc.LanguageRuntime_RunPluginServer) error {
return errors.New("not supported")
}
5 changes: 5 additions & 0 deletions sdk/go/auto/stack.go
Expand Up @@ -1076,6 +1076,11 @@ func (s *languageRuntimeServer) GetProgramDependencies(
return nil, status.Errorf(codes.Unimplemented, "method GetProgramDependencies not implemented")
}

func (s *languageRuntimeServer) RunPlugin(
_ *pulumirpc.RunPluginRequest, _ pulumirpc.LanguageRuntime_RunPluginServer) error {
return status.Errorf(codes.Unimplemented, "method RunPlugin not implemented")
}

type fileWatcher struct {
Filename string
tail *tail.Tail
Expand Down
11 changes: 11 additions & 0 deletions sdk/go/common/resource/plugin/langruntime.go
Expand Up @@ -15,6 +15,7 @@
package plugin

import (
"context"
"io"

"github.com/blang/semver"
Expand Down Expand Up @@ -48,6 +49,9 @@ type LanguageRuntime interface {

// GetProgramDependencies returns information about the dependencies for the given program.
GetProgramDependencies(info ProgInfo, transitiveDependencies bool) ([]DependencyInfo, error)

// RunPlugin executes a plugin program and returns its result asynchronously.
RunPlugin(info RunPluginInfo) (io.Reader, io.Reader, context.CancelFunc, error)
}

type DependencyInfo struct {
Expand All @@ -61,6 +65,13 @@ type AboutInfo struct {
Metadata map[string]string
}

type RunPluginInfo struct {
Pwd string
Program string
Args []string
Env []string
}

// ProgInfo contains minimal information about the program to be run.
type ProgInfo struct {
Proj *workspace.Project // the program project/package.
Expand Down
56 changes: 56 additions & 0 deletions sdk/go/common/resource/plugin/langruntime_plugin.go
Expand Up @@ -15,6 +15,7 @@
package plugin

import (
"context"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -342,3 +343,58 @@ func (h *langhost) GetProgramDependencies(info ProgInfo, transitiveDependencies
logging.V(7).Infof("%s success: #versions=%d", prefix, len(results))
return results, nil
}

func (h *langhost) RunPlugin(info RunPluginInfo) (io.Reader, io.Reader, context.CancelFunc, error) {
logging.V(7).Infof("langhost[%v].RunPlugin(pwd=%s,program=%s) executing",
h.runtime, info.Pwd, info.Program)

ctx, kill := context.WithCancel(h.ctx.Request())

resp, err := h.client.RunPlugin(ctx, &pulumirpc.RunPluginRequest{
Pwd: info.Pwd,
Program: info.Program,
Args: info.Args,
Env: info.Env,
})

if err != nil {
// If there was an error starting the plugin kill the context for this request to ensure any lingering
// connection terminates.
kill()
return nil, nil, nil, err
}

outr, outw := io.Pipe()
errr, errw := io.Pipe()

go func() {
for {
logging.V(10).Infoln("Waiting for plugin message")
msg, err := resp.Recv()
if err != nil {
contract.IgnoreError(outw.CloseWithError(err))
contract.IgnoreError(errw.CloseWithError(err))
break
}

logging.V(10).Infoln("Got plugin response: ", msg)

if value, ok := msg.Output.(*pulumirpc.RunPluginResponse_Stdout); ok {
n, err := outw.Write(value.Stdout)
contract.AssertNoError(err)
contract.Assert(n == len(value.Stdout))
} else if value, ok := msg.Output.(*pulumirpc.RunPluginResponse_Stderr); ok {
n, err := errw.Write(value.Stderr)
contract.AssertNoError(err)
contract.Assert(n == len(value.Stderr))
} else if _, ok := msg.Output.(*pulumirpc.RunPluginResponse_Exitcode); ok {
// If stdout and stderr are empty we've flushed and are returning the exit code
outw.Close()
errw.Close()
break
}
}
}()

return outr, errr, kill, nil
}
44 changes: 42 additions & 2 deletions sdk/go/common/resource/plugin/plugin.go
Expand Up @@ -21,6 +21,7 @@ import (
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/pulumi/pulumi/sdk/v3/go/common/util/contract"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/logging"
"github.com/pulumi/pulumi/sdk/v3/go/common/util/rpcutil"
"github.com/pulumi/pulumi/sdk/v3/go/common/workspace"
)

// PulumiPluginJSON represents additional information about a package's associated Pulumi plugin.
Expand Down Expand Up @@ -186,7 +188,7 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, option
}

// Try to execute the binary.
plug, err := execPlugin(bin, args, pwd, env)
plug, err := execPlugin(ctx, bin, prefix, args, pwd, env)
if err != nil {
return nil, errors.Wrapf(err, "failed to load plugin %s", bin)
}
Expand Down Expand Up @@ -294,7 +296,7 @@ func newPlugin(ctx *Context, pwd, bin, prefix string, args, env []string, option
}

// execPlugin starts the plugin executable.
func execPlugin(bin string, pluginArgs []string, pwd string, env []string) (*plugin, error) {
func execPlugin(ctx *Context, bin, prefix string, pluginArgs []string, pwd string, env []string) (*plugin, error) {
args := buildPluginArguments(pluginArgumentOptions{
pluginArgs: pluginArgs,
tracingEndpoint: cmdutil.TracingEndpoint,
Expand All @@ -303,6 +305,44 @@ func execPlugin(bin string, pluginArgs []string, pwd string, env []string) (*plu
logToStderr: logging.LogToStderr,
verbose: logging.Verbose,
})

// Check to see if we have a binary we can invoke directly
if _, err := os.Stat(bin); os.IsNotExist(err) {
// If we don't have the expected binary, see if we have a "PulumiPlugin.yaml"
pluginDir := filepath.Dir(bin)
proj, err := workspace.LoadPluginProject(filepath.Join(pluginDir, "PulumiPlugin.yaml"))
if err != nil {
return nil, errors.Wrap(err, "loading PulumiPlugin.yaml")
}

logging.V(9).Infof("Launching plugin '%v' from '%v' via runtime '%s'", prefix, pluginDir, proj.Runtime.Name())

runtime, err := ctx.Host.LanguageRuntime(proj.Runtime.Name())
if err != nil {
return nil, errors.Wrap(err, "loading runtime")
}

stdout, stderr, kill, err := runtime.RunPlugin(RunPluginInfo{
Pwd: pwd,
Program: pluginDir,
Args: pluginArgs,
Env: env,
})

if err != nil {
return nil, err
}

return &plugin{
Bin: bin,
Args: args,
Env: env,
Kill: func() error { kill(); return nil },
Stdout: io.NopCloser(stdout),
Stderr: io.NopCloser(stderr),
}, nil
}

cmd := exec.Command(bin, args...)
cmdutil.RegisterProcessGroup(cmd)
cmd.Dir = pwd
Expand Down
59 changes: 36 additions & 23 deletions sdk/go/common/util/rpcutil/writer.go
Expand Up @@ -68,40 +68,23 @@ type nullCloser struct{}
func (c *nullCloser) Close() error { return nil }

type pipeWriter struct {
sendToStdout bool
server pulumirpc.LanguageRuntime_InstallDependenciesServer
send func([]byte) error
}

func (w *pipeWriter) Write(p []byte) (int, error) {
data := pulumirpc.InstallDependenciesResponse{}
if w.sendToStdout {
data.Stdout = p
} else {
data.Stderr = p
}

err := w.server.Send(&data)
err := w.send(p)
if err != nil {
return 0, err
}

return len(p), nil
}

// Returns a pair of streams for use with the language runtimes InstallDependencies method
func MakeStreams(
server pulumirpc.LanguageRuntime_InstallDependenciesServer,
func makeStreams(
sendStdout func([]byte) error, sendStderr func([]byte) error,
isTerminal bool) (io.Closer, io.Writer, io.Writer, error) {

stderr := &pipeWriter{
server: server,
sendToStdout: false,
}

stdout := &pipeWriter{
server: server,
sendToStdout: true,
}
stderr := &pipeWriter{send: sendStderr}
stdout := &pipeWriter{send: sendStdout}

if isTerminal {
logging.V(11).Infoln("Opening pseudo terminal")
Expand Down Expand Up @@ -134,3 +117,33 @@ func MakeStreams(

return &nullCloser{}, stdout, stderr, nil
}

// Returns a pair of streams for use with the language runtimes InstallDependencies method
func MakeInstallDependenciesStreams(
server pulumirpc.LanguageRuntime_InstallDependenciesServer,
isTerminal bool) (io.Closer, io.Writer, io.Writer, error) {

return makeStreams(
func(b []byte) error {
return server.Send(&pulumirpc.InstallDependenciesResponse{Stdout: b})
},
func(b []byte) error {
return server.Send(&pulumirpc.InstallDependenciesResponse{Stderr: b})
},
isTerminal)
}

// Returns a pair of streams for use with the language runtimes RunPlugin method
func MakeRunPluginStreams(
server pulumirpc.LanguageRuntime_RunPluginServer,
isTerminal bool) (io.Closer, io.Writer, io.Writer, error) {

return makeStreams(
func(b []byte) error {
return server.Send(&pulumirpc.RunPluginResponse{Output: &pulumirpc.RunPluginResponse_Stdout{Stdout: b}})
},
func(b []byte) error {
return server.Send(&pulumirpc.RunPluginResponse{Output: &pulumirpc.RunPluginResponse_Stderr{Stderr: b}})
},
isTerminal)
}
8 changes: 4 additions & 4 deletions sdk/go/common/util/rpcutil/writer_test.go
Expand Up @@ -62,7 +62,7 @@ func TestWriter_NoTerminal(t *testing.T) {

server := makeStreamMock()

closer, stdout, stderr, err := MakeStreams(server, false)
closer, stdout, stderr, err := MakeInstallDependenciesStreams(server, false)
assert.NoError(t, err)

// stdout and stderr should just write to server
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestWriter_Terminal(t *testing.T) {

server := makeStreamMock()

closer, stdout, stderr, err := MakeStreams(server, true)
closer, stdout, stderr, err := MakeInstallDependenciesStreams(server, true)
assert.NoError(t, err)

// We _may_ have made a pty and stdout and stderr are the same and both send to the server as stdout
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestWriter_IsPTY(t *testing.T) {

server := makeStreamMock()

closer, stdout, stderr, err := MakeStreams(server, true)
closer, stdout, stderr, err := MakeInstallDependenciesStreams(server, true)
assert.NoError(t, err)

// We _may_ have made a pty, check IsTerminal returns true
Expand All @@ -189,7 +189,7 @@ func TestWriter_SafeToCloseTwice(t *testing.T) {

server := makeStreamMock()

closer, _, _, err := MakeStreams(server, true)
closer, _, _, err := MakeInstallDependenciesStreams(server, true)
assert.NoError(t, err)

err = closer.Close()
Expand Down

0 comments on commit 2bbfa5b

Please sign in to comment.