Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test for runTerraformCmd leaked go-routine #299

Merged
merged 8 commits into from May 20, 2022
37 changes: 36 additions & 1 deletion tfexec/cmd.go
@@ -1,9 +1,11 @@
package tfexec

import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -171,7 +173,7 @@ func (tf *Terraform) buildEnv(mergeEnv map[string]string) []string {
}

func (tf *Terraform) buildTerraformCmd(ctx context.Context, mergeEnv map[string]string, args ...string) *exec.Cmd {
cmd := exec.Command(tf.execPath, args...)
cmd := exec.CommandContext(ctx, tf.execPath, args...)

cmd.Env = tf.buildEnv(mergeEnv)
cmd.Dir = tf.workingDir
Expand Down Expand Up @@ -230,3 +232,36 @@ func mergeWriters(writers ...io.Writer) io.Writer {
}
return io.MultiWriter(compact...)
}

func writeOutput(ctx context.Context, r io.ReadCloser, w io.Writer) error {
// ReadBytes will block until bytes are read, which can cause a delay in
// returning even if the command's context has been canceled. Use a separate
// goroutine to prompt ReadBytes to return on cancel
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate the comment ❤️

closeCtx, closeCancel := context.WithCancel(ctx)
defer closeCancel()
go func() {
select {
case <-ctx.Done():
r.Close()
case <-closeCtx.Done():
return
}
}()

buf := bufio.NewReader(r)
for {
line, err := buf.ReadBytes('\n')
if len(line) > 0 {
if _, err := w.Write(line); err != nil {
return err
}
}
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}

return err
}
}
}
71 changes: 55 additions & 16 deletions tfexec/cmd_default.go
Expand Up @@ -7,40 +7,79 @@ import (
"context"
"os/exec"
"strings"
"sync"
)

func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
var errBuf strings.Builder

cmd.Stdout = mergeWriters(cmd.Stdout, tf.stdout)
cmd.Stderr = mergeWriters(cmd.Stderr, tf.stderr, &errBuf)

go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded || ctx.Err() == context.Canceled {
if cmd != nil && cmd.Process != nil && cmd.ProcessState != nil {
err := cmd.Process.Kill()
if err != nil {
tf.logger.Printf("error from kill: %s", err)
}
}
}
}()

// check for early cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}

err := cmd.Run()
// Read stdout / stderr logs from pipe instead of setting cmd.Stdout and
// cmd.Stderr because it can cause hanging when killing the command
// https://github.com/golang/go/issues/23019
stdoutWriter := mergeWriters(cmd.Stdout, tf.stdout)
stderrWriter := mergeWriters(tf.stderr, &errBuf)

cmd.Stderr = nil
cmd.Stdout = nil

stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return err
}

stderrPipe, err := cmd.StderrPipe()
if err != nil {
return err
}

err = cmd.Start()
if err == nil && ctx.Err() != nil {
err = ctx.Err()
}
if err != nil {
return tf.wrapExitError(ctx, err, "")
}

var errStdout, errStderr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter)
}()

wg.Add(1)
go func() {
defer wg.Done()
errStderr = writeOutput(ctx, stderrPipe, stderrWriter)
}()

// Reads from pipes must be completed before calling cmd.Wait(). Otherwise
// can cause a race condition
wg.Wait()

err = cmd.Wait()
if err == nil && ctx.Err() != nil {
err = ctx.Err()
}
if err != nil {
return tf.wrapExitError(ctx, err, errBuf.String())
}

// Return error if there was an issue reading the std out/err
if errStdout != nil && ctx.Err() != nil {
return tf.wrapExitError(ctx, errStdout, errBuf.String())
}
if errStderr != nil && ctx.Err() != nil {
return tf.wrapExitError(ctx, errStderr, errBuf.String())
}

return nil
}
39 changes: 39 additions & 0 deletions tfexec/cmd_default_test.go
@@ -0,0 +1,39 @@
//go:build !linux
// +build !linux

package tfexec

import (
"bytes"
"context"
"log"
"strings"
"testing"
"time"
)

func Test_runTerraformCmd_default(t *testing.T) {
// Checks runTerraformCmd for race condition when using
// go test -race -run Test_runTerraformCmd_default ./tfexec
var buf bytes.Buffer

tf := &Terraform{
logger: log.New(&buf, "", 0),
execPath: "echo",
}

ctx, cancel := context.WithCancel(context.Background())

cmd := tf.buildTerraformCmd(ctx, nil, "hello tf-exec!")
err := tf.runTerraformCmd(ctx, cmd)
if err != nil {
t.Fatal(err)
}

// Cancel stops the leaked go routine which logs an error
cancel()
time.Sleep(time.Second)
if strings.Contains(buf.String(), "error from kill") {
t.Fatal("canceling context should not lead to logging an error")
}
}
74 changes: 55 additions & 19 deletions tfexec/cmd_linux.go
Expand Up @@ -4,51 +4,87 @@ import (
"context"
"os/exec"
"strings"
"sync"
"syscall"
)

func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
var errBuf strings.Builder

cmd.Stdout = mergeWriters(cmd.Stdout, tf.stdout)
cmd.Stderr = mergeWriters(cmd.Stderr, tf.stderr, &errBuf)

cmd.SysProcAttr = &syscall.SysProcAttr{
// kill children if parent is dead
Pdeathsig: syscall.SIGKILL,
// set process group ID
Setpgid: true,
}

go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded || ctx.Err() == context.Canceled {
if cmd != nil && cmd.Process != nil && cmd.ProcessState != nil {
// send SIGINT to process group
err := syscall.Kill(-cmd.Process.Pid, syscall.SIGINT)
if err != nil {
tf.logger.Printf("error from SIGINT: %s", err)
}
}

// TODO: send a kill if it doesn't respond for a bit?
}
}()

// check for early cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}

err := cmd.Run()
// Read stdout / stderr logs from pipe instead of setting cmd.Stdout and
// cmd.Stderr because it can cause hanging when killing the command
// https://github.com/golang/go/issues/23019
stdoutWriter := mergeWriters(cmd.Stdout, tf.stdout)
stderrWriter := mergeWriters(tf.stderr, &errBuf)

cmd.Stderr = nil
cmd.Stdout = nil

stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return err
}

stderrPipe, err := cmd.StderrPipe()
if err != nil {
return err
}

err = cmd.Start()
if err == nil && ctx.Err() != nil {
err = ctx.Err()
}
if err != nil {
return tf.wrapExitError(ctx, err, "")
}

var errStdout, errStderr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter)
}()

wg.Add(1)
go func() {
defer wg.Done()
errStderr = writeOutput(ctx, stderrPipe, stderrWriter)
}()

// Reads from pipes must be completed before calling cmd.Wait(). Otherwise
// can cause a race condition
wg.Wait()

err = cmd.Wait()
if err == nil && ctx.Err() != nil {
err = ctx.Err()
}
if err != nil {
return tf.wrapExitError(ctx, err, errBuf.String())
}

// Return error if there was an issue reading the std out/err
if errStdout != nil && ctx.Err() != nil {
return tf.wrapExitError(ctx, errStdout, errBuf.String())
}
if errStderr != nil && ctx.Err() != nil {
return tf.wrapExitError(ctx, errStderr, errBuf.String())
}

return nil
}
36 changes: 36 additions & 0 deletions tfexec/cmd_linux_test.go
@@ -0,0 +1,36 @@
package tfexec

import (
"bytes"
"context"
"log"
"strings"
"testing"
"time"
)

func Test_runTerraformCmd_linux(t *testing.T) {
// Checks runTerraformCmd for race condition when using
// go test -race -run Test_runTerraformCmd_linux ./tfexec -tags=linux
var buf bytes.Buffer

tf := &Terraform{
logger: log.New(&buf, "", 0),
execPath: "echo",
}

ctx, cancel := context.WithCancel(context.Background())

cmd := tf.buildTerraformCmd(ctx, nil, "hello tf-exec!")
err := tf.runTerraformCmd(ctx, cmd)
if err != nil {
t.Fatal(err)
}

// Cancel stops the leaked go routine which logs an error
cancel()
time.Sleep(time.Second)
if strings.Contains(buf.String(), "error from kill") {
t.Fatal("canceling context should not lead to logging an error")
}
}
22 changes: 15 additions & 7 deletions tfexec/internal/e2etest/errors_test.go
Expand Up @@ -205,13 +205,21 @@ func TestContext_sleepTimeoutExpired(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

err = tf.Apply(ctx)
if err == nil {
t.Fatal("expected error, but didn't find one")
}

if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected context.DeadlineExceeded, got %T %s", err, err)
errCh := make(chan error)
go func() {
err = tf.Apply(ctx)
if err != nil {
errCh <- err
}
}()

select {
case err := <-errCh:
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected context.DeadlineExceeded, got %T %s", err, err)
}
case <-time.After(time.Second * 10):
t.Fatal("terraform apply should have canceled and returned in ~5s")
}
})
}
Expand Down
13 changes: 9 additions & 4 deletions tfexec/internal/e2etest/util_test.go
Expand Up @@ -91,16 +91,21 @@ func runTestVersions(t *testing.T, versions []string, fixtureName string, cb fun
}
}

var stdouterr strings.Builder
tf.SetStdout(&stdouterr)
tf.SetStderr(&stdouterr)
// Separate strings.Builder because it's not concurrent safe
var stdout strings.Builder
tf.SetStdout(&stdout)
var stderr strings.Builder
tf.SetStderr(&stderr)

tf.SetLogger(&testingPrintfer{t})

// TODO: capture panics here?
cb(t, runningVersion, tf)

t.Logf("CLI Output:\n%s", stdouterr.String())
t.Logf("CLI Output:\n%s", stdout.String())
if len(stderr.String()) > 0 {
t.Logf("CLI Error:\n%s", stderr.String())
}
})
}
}
Expand Down