Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test for runTerraformCmd leaked go-routine #299

Merged
merged 8 commits into from May 20, 2022
23 changes: 22 additions & 1 deletion tfexec/cmd.go
@@ -1,9 +1,11 @@
package tfexec

import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -171,7 +173,7 @@ func (tf *Terraform) buildEnv(mergeEnv map[string]string) []string {
}

func (tf *Terraform) buildTerraformCmd(ctx context.Context, mergeEnv map[string]string, args ...string) *exec.Cmd {
cmd := exec.Command(tf.execPath, args...)
cmd := exec.CommandContext(ctx, tf.execPath, args...)

cmd.Env = tf.buildEnv(mergeEnv)
cmd.Dir = tf.workingDir
Expand Down Expand Up @@ -230,3 +232,22 @@ func mergeWriters(writers ...io.Writer) io.Writer {
}
return io.MultiWriter(compact...)
}

func writeOutput(r io.ReadCloser, w io.Writer) error {
buf := bufio.NewReader(r)
for {
line, err := buf.ReadBytes('\n')
if len(line) > 0 {
if _, err := w.Write(line); err != nil {
return err
}
}
if err != nil {
if errors.As(err, &io.EOF) {
radeksimko marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

return err
}
}
}
66 changes: 49 additions & 17 deletions tfexec/cmd_default.go
Expand Up @@ -12,35 +12,67 @@ import (
func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
var errBuf strings.Builder

cmd.Stdout = mergeWriters(cmd.Stdout, tf.stdout)
cmd.Stderr = mergeWriters(cmd.Stderr, tf.stderr, &errBuf)

go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded || ctx.Err() == context.Canceled {
if cmd != nil && cmd.Process != nil && cmd.ProcessState != nil {
err := cmd.Process.Kill()
if err != nil {
tf.logger.Printf("error from kill: %s", err)
}
}
}
}()

// check for early cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}

err := cmd.Run()
// Read stdout / stderr logs from pipe instead of setting cmd.Stdout and
// cmd.Stderr because it can cause hanging when killing the command
// https://github.com/golang/go/issues/23019
stdoutWriter := mergeWriters(cmd.Stdout, tf.stdout)
stderrWriter := mergeWriters(tf.stderr, &errBuf)

cmd.Stderr = nil
cmd.Stdout = nil

stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return err
}

stderrPipe, err := cmd.StderrPipe()
if err != nil {
return err
}

err = cmd.Start()
if err == nil && ctx.Err() != nil {
err = ctx.Err()
}
if err != nil {
return tf.wrapExitError(ctx, err, "")
}

exitChLen := 2
exitCh := make(chan error, exitChLen)
go func() {
exitCh <- writeOutput(stdoutPipe, stdoutWriter)
}()
go func() {
exitCh <- writeOutput(stderrPipe, stderrWriter)
}()

err = cmd.Wait()
if err == nil && ctx.Err() != nil {
err = ctx.Err()
}
if err != nil {
return tf.wrapExitError(ctx, err, errBuf.String())
}

return nil
// Wait for the logs to finish writing
counter := 0
for {
counter++
err := <-exitCh
if err != nil && err != context.Canceled {
return tf.wrapExitError(ctx, err, errBuf.String())
}
if counter >= exitChLen {
return ctx.Err()
}
}
}
39 changes: 39 additions & 0 deletions tfexec/cmd_default_test.go
@@ -0,0 +1,39 @@
//go:build !linux
// +build !linux

package tfexec

import (
"bytes"
"context"
"log"
"strings"
"testing"
"time"
)

func Test_runTerraformCmd_default(t *testing.T) {
// Checks runTerraformCmd for race condition when using
// go test -race -run Test_runTerraformCmd_default ./tfexec
var buf bytes.Buffer

tf := &Terraform{
logger: log.New(&buf, "", 0),
execPath: "echo",
}

ctx, cancel := context.WithCancel(context.Background())

cmd := tf.buildTerraformCmd(ctx, nil, "hello tf-exec!")
err := tf.runTerraformCmd(ctx, cmd)
if err != nil {
t.Fatal(err)
}

// Cancel stops the leaked go routine which logs an error
cancel()
time.Sleep(time.Second)
if strings.Contains(buf.String(), "error from kill") {
t.Fatal("canceling context should not lead to logging an error")
}
}
69 changes: 49 additions & 20 deletions tfexec/cmd_linux.go
Expand Up @@ -10,45 +10,74 @@ import (
func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
var errBuf strings.Builder

cmd.Stdout = mergeWriters(cmd.Stdout, tf.stdout)
cmd.Stderr = mergeWriters(cmd.Stderr, tf.stderr, &errBuf)

cmd.SysProcAttr = &syscall.SysProcAttr{
// kill children if parent is dead
Pdeathsig: syscall.SIGKILL,
// set process group ID
Setpgid: true,
}

go func() {
<-ctx.Done()
if ctx.Err() == context.DeadlineExceeded || ctx.Err() == context.Canceled {
if cmd != nil && cmd.Process != nil && cmd.ProcessState != nil {
// send SIGINT to process group
err := syscall.Kill(-cmd.Process.Pid, syscall.SIGINT)
if err != nil {
tf.logger.Printf("error from SIGINT: %s", err)
}
}

// TODO: send a kill if it doesn't respond for a bit?
}
}()

// check for early cancellation
select {
case <-ctx.Done():
return ctx.Err()
default:
}

err := cmd.Run()
// Read stdout / stderr logs from pipe instead of setting cmd.Stdout and
// cmd.Stderr because it can cause hanging when killing the command
// https://github.com/golang/go/issues/23019
stdoutWriter := mergeWriters(cmd.Stdout, tf.stdout)
stderrWriter := mergeWriters(tf.stderr, &errBuf)

cmd.Stderr = nil
cmd.Stdout = nil

stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return err
}

stderrPipe, err := cmd.StderrPipe()
if err != nil {
return err
}

err = cmd.Start()
if err == nil && ctx.Err() != nil {
err = ctx.Err()
}
if err != nil {
return tf.wrapExitError(ctx, err, "")
}

exitChLen := 2
exitCh := make(chan error, exitChLen)
go func() {
exitCh <- writeOutput(stdoutPipe, stdoutWriter)
}()
go func() {
exitCh <- writeOutput(stderrPipe, stderrWriter)
}()

err = cmd.Wait()
if err == nil && ctx.Err() != nil {
err = ctx.Err()
}
if err != nil {
return tf.wrapExitError(ctx, err, errBuf.String())
}

return nil
// Wait for the logs to finish writing
counter := 0
for {
counter++
err := <-exitCh
if err != nil && err != context.Canceled {
return tf.wrapExitError(ctx, err, errBuf.String())
}
if counter >= exitChLen {
return ctx.Err()
}
}
}
36 changes: 36 additions & 0 deletions tfexec/cmd_linux_test.go
@@ -0,0 +1,36 @@
package tfexec

import (
"bytes"
"context"
"log"
"strings"
"testing"
"time"
)

func Test_runTerraformCmd_linux(t *testing.T) {
// Checks runTerraformCmd for race condition when using
// go test -race -run Test_runTerraformCmd_linux ./tfexec -tags=linux
var buf bytes.Buffer

tf := &Terraform{
logger: log.New(&buf, "", 0),
execPath: "echo",
}

ctx, cancel := context.WithCancel(context.Background())

cmd := tf.buildTerraformCmd(ctx, nil, "hello tf-exec!")
err := tf.runTerraformCmd(ctx, cmd)
if err != nil {
t.Fatal(err)
}

// Cancel stops the leaked go routine which logs an error
cancel()
time.Sleep(time.Second)
if strings.Contains(buf.String(), "error from kill") {
t.Fatal("canceling context should not lead to logging an error")
}
}
22 changes: 15 additions & 7 deletions tfexec/internal/e2etest/errors_test.go
Expand Up @@ -205,13 +205,21 @@ func TestContext_sleepTimeoutExpired(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

err = tf.Apply(ctx)
if err == nil {
t.Fatal("expected error, but didn't find one")
}

if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected context.DeadlineExceeded, got %T %s", err, err)
errCh := make(chan error)
go func() {
err = tf.Apply(ctx)
if err != nil {
errCh <- err
}
}()

select {
case err := <-errCh:
if !errors.Is(err, context.DeadlineExceeded) {
t.Fatalf("expected context.DeadlineExceeded, got %T %s", err, err)
}
case <-time.After(time.Second * 10):
t.Fatal("terraform apply should have canceled and returned in ~5s")
}
})
}
Expand Down