Skip to content

Commit

Permalink
Fixes to runTerraformCmd for race conditions
Browse files Browse the repository at this point in the history
 - Use waitgroups for more readability
 - Improve handling errors from writeOutput
 - Finish reading from pipes before calling cmd.Wait - fixes a race condition
 that leads to an error :`read |0: file already closed`
 - Because now waiting for pipes to finish reading, need to update waitGroup to
 close buf.Read on context cancel. Otherwise buf.Read blocks until next line
 before stopping. Causes TestContext_sleepTimeoutExpired takes a little too long
 to cancel (~20s)
  • Loading branch information
lornasong committed May 20, 2022
1 parent 594b411 commit 5a93b1e
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 31 deletions.
16 changes: 15 additions & 1 deletion tfexec/cmd.go
Expand Up @@ -233,7 +233,21 @@ func mergeWriters(writers ...io.Writer) io.Writer {
return io.MultiWriter(compact...)
}

func writeOutput(r io.ReadCloser, w io.Writer) error {
func writeOutput(ctx context.Context, r io.ReadCloser, w io.Writer) error {
// ReadBytes will block until bytes are read, which can cause a delay in
// returning even if the command's context has been canceled. Use a separate
// goroutine to prompt ReadBytes to return on cancel
closeCtx, closeCancel := context.WithCancel(ctx)
defer closeCancel()
go func() {
select {
case <-ctx.Done():
r.Close()
case <-closeCtx.Done():
return
}
}()

buf := bufio.NewReader(r)
for {
line, err := buf.ReadBytes('\n')
Expand Down
37 changes: 22 additions & 15 deletions tfexec/cmd_default.go
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"os/exec"
"strings"
"sync"
)

func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
Expand Down Expand Up @@ -46,15 +47,24 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
return tf.wrapExitError(ctx, err, "")
}

exitChLen := 2
exitCh := make(chan error, exitChLen)
var errStdout, errStderr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
exitCh <- writeOutput(stdoutPipe, stdoutWriter)
defer wg.Done()
errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter)
}()

wg.Add(1)
go func() {
exitCh <- writeOutput(stderrPipe, stderrWriter)
defer wg.Done()
errStderr = writeOutput(ctx, stderrPipe, stderrWriter)
}()

// Reads from pipes must be completed before calling cmd.Wait(). Otherwise
// can cause a race condition
wg.Wait()

err = cmd.Wait()
if err == nil && ctx.Err() != nil {
err = ctx.Err()
Expand All @@ -63,16 +73,13 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
return tf.wrapExitError(ctx, err, errBuf.String())
}

// Wait for the logs to finish writing
counter := 0
for {
counter++
err := <-exitCh
if err != nil && err != context.Canceled {
return tf.wrapExitError(ctx, err, errBuf.String())
}
if counter >= exitChLen {
return ctx.Err()
}
// Return error if there was an issue reading the std out/err
if errStdout != nil && ctx.Err() != nil {
return tf.wrapExitError(ctx, errStdout, errBuf.String())
}
if errStderr != nil && ctx.Err() != nil {
return tf.wrapExitError(ctx, errStderr, errBuf.String())
}

return nil
}
37 changes: 22 additions & 15 deletions tfexec/cmd_linux.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os/exec"
"strings"
"sync"
"syscall"
)

Expand Down Expand Up @@ -51,15 +52,24 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
return tf.wrapExitError(ctx, err, "")
}

exitChLen := 2
exitCh := make(chan error, exitChLen)
var errStdout, errStderr error
var wg sync.WaitGroup
wg.Add(1)
go func() {
exitCh <- writeOutput(stdoutPipe, stdoutWriter)
defer wg.Done()
errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter)
}()

wg.Add(1)
go func() {
exitCh <- writeOutput(stderrPipe, stderrWriter)
defer wg.Done()
errStderr = writeOutput(ctx, stderrPipe, stderrWriter)
}()

// Reads from pipes must be completed before calling cmd.Wait(). Otherwise
// can cause a race condition
wg.Wait()

err = cmd.Wait()
if err == nil && ctx.Err() != nil {
err = ctx.Err()
Expand All @@ -68,16 +78,13 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error {
return tf.wrapExitError(ctx, err, errBuf.String())
}

// Wait for the logs to finish writing
counter := 0
for {
counter++
err := <-exitCh
if err != nil && err != context.Canceled {
return tf.wrapExitError(ctx, err, errBuf.String())
}
if counter >= exitChLen {
return ctx.Err()
}
// Return error if there was an issue reading the std out/err
if errStdout != nil && ctx.Err() != nil {
return tf.wrapExitError(ctx, errStdout, errBuf.String())
}
if errStderr != nil && ctx.Err() != nil {
return tf.wrapExitError(ctx, errStderr, errBuf.String())
}

return nil
}

0 comments on commit 5a93b1e

Please sign in to comment.