From b3169c032e4fcc3b759b885246df8859b3cebc7a Mon Sep 17 00:00:00 2001 From: Lorna Song Date: Thu, 19 May 2022 21:30:50 -0400 Subject: [PATCH] Fixes to runTerraformCmd for race conditions - Use waitgroups for more readability - Improve handling errors from writeOutput - Finish reading from pipes before calling cmd.Wait - fixes a race condition that leads to an error :`read |0: file already closed` - Because now waiting for pipes to finish reading, need to update waitGroup to close buf.Read on context cancel. Otherwise buf.Read blocks until next line before stopping. Causes TestContext_sleepTimeoutExpired takes a little too long to cancel (~20s) --- tfexec/cmd.go | 16 +++++++++++++++- tfexec/cmd_default.go | 37 ++++++++++++++++++++++--------------- tfexec/cmd_linux.go | 36 +++++++++++++++++++++--------------- 3 files changed, 58 insertions(+), 31 deletions(-) diff --git a/tfexec/cmd.go b/tfexec/cmd.go index c1575514..8cdce5a6 100644 --- a/tfexec/cmd.go +++ b/tfexec/cmd.go @@ -233,7 +233,21 @@ func mergeWriters(writers ...io.Writer) io.Writer { return io.MultiWriter(compact...) } -func writeOutput(r io.ReadCloser, w io.Writer) error { +func writeOutput(ctx context.Context, r io.ReadCloser, w io.Writer) error { + // ReadBytes will block until bytes are read, which can cause a delay in + // returning even if the command's context has been canceled. Use a separate + // goroutine to prompt ReadBytes to return on cancel + closeCtx, closeCancel := context.WithCancel(ctx) + defer closeCancel() + go func() { + select { + case <-ctx.Done(): + r.Close() + case <-closeCtx.Done(): + return + } + }() + buf := bufio.NewReader(r) for { line, err := buf.ReadBytes('\n') diff --git a/tfexec/cmd_default.go b/tfexec/cmd_default.go index ea79afef..6d7b768e 100644 --- a/tfexec/cmd_default.go +++ b/tfexec/cmd_default.go @@ -7,6 +7,7 @@ import ( "context" "os/exec" "strings" + "sync" ) func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error { @@ -46,15 +47,24 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error { return tf.wrapExitError(ctx, err, "") } - exitChLen := 2 - exitCh := make(chan error, exitChLen) + var errStdout, errStderr error + var wg sync.WaitGroup + wg.Add(1) go func() { - exitCh <- writeOutput(stdoutPipe, stdoutWriter) + defer wg.Done() + errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter) }() + + wg.Add(1) go func() { - exitCh <- writeOutput(stderrPipe, stderrWriter) + defer wg.Done() + errStderr = writeOutput(ctx, stderrPipe, stderrWriter) }() + // Reads from pipes must be completed before calling cmd.Wait(). Otherwise + // can cause a race condition + wg.Wait() + err = cmd.Wait() if err == nil && ctx.Err() != nil { err = ctx.Err() @@ -63,16 +73,13 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error { return tf.wrapExitError(ctx, err, errBuf.String()) } - // Wait for the logs to finish writing - counter := 0 - for { - counter++ - err := <-exitCh - if err != nil && err != context.Canceled { - return tf.wrapExitError(ctx, err, errBuf.String()) - } - if counter >= exitChLen { - return ctx.Err() - } + // Return error if there was an issue reading the std out/err + if errStdout != nil && ctx.Err() != nil { + return tf.wrapExitError(ctx, errStdout, errBuf.String()) } + if errStderr != nil && ctx.Err() != nil { + return tf.wrapExitError(ctx, errStderr, errBuf.String()) + } + + return nil } diff --git a/tfexec/cmd_linux.go b/tfexec/cmd_linux.go index eeec7ed1..5d123cd9 100644 --- a/tfexec/cmd_linux.go +++ b/tfexec/cmd_linux.go @@ -51,15 +51,24 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error { return tf.wrapExitError(ctx, err, "") } - exitChLen := 2 - exitCh := make(chan error, exitChLen) + var errStdout, errStderr error + var wg sync.WaitGroup + wg.Add(1) go func() { - exitCh <- writeOutput(stdoutPipe, stdoutWriter) + defer wg.Done() + errStdout = writeOutput(ctx, stdoutPipe, stdoutWriter) }() + + wg.Add(1) go func() { - exitCh <- writeOutput(stderrPipe, stderrWriter) + defer wg.Done() + errStderr = writeOutput(ctx, stderrPipe, stderrWriter) }() + // Reads from pipes must be completed before calling cmd.Wait(). Otherwise + // can cause a race condition + wg.Wait() + err = cmd.Wait() if err == nil && ctx.Err() != nil { err = ctx.Err() @@ -68,16 +77,13 @@ func (tf *Terraform) runTerraformCmd(ctx context.Context, cmd *exec.Cmd) error { return tf.wrapExitError(ctx, err, errBuf.String()) } - // Wait for the logs to finish writing - counter := 0 - for { - counter++ - err := <-exitCh - if err != nil && err != context.Canceled { - return tf.wrapExitError(ctx, err, errBuf.String()) - } - if counter >= exitChLen { - return ctx.Err() - } + // Return error if there was an issue reading the std out/err + if errStdout != nil && ctx.Err() != nil { + return tf.wrapExitError(ctx, errStdout, errBuf.String()) } + if errStderr != nil && ctx.Err() != nil { + return tf.wrapExitError(ctx, errStderr, errBuf.String()) + } + + return nil }