Skip to content

Commit

Permalink
Fix deadlock when erroring writes are slow
Browse files Browse the repository at this point in the history
  • Loading branch information
MasslessParticle authored and jhendrixMSFT committed Sep 20, 2022
1 parent 71b7461 commit 26961ea
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 5 deletions.
44 changes: 40 additions & 4 deletions sdk/storage/azblob/blockblob/chunkwriting.go
Expand Up @@ -13,12 +13,12 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared"
"io"
"sync"
"sync/atomic"

"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/internal/shared"
)

// blockWriter provides methods to upload blocks that represent a file to a server and commit them.
Expand Down Expand Up @@ -188,9 +188,7 @@ func (c *copier) write(chunk copierChunk) {

// close commits our blocks to blob storage and closes our writer.
func (c *copier) close() error {
c.wg.Wait()

if err := c.getErr(); err != nil {
if err := c.waitForFinish(); err != nil {
return err
}

Expand All @@ -200,6 +198,44 @@ func (c *copier) close() error {
return err
}

// waitForFinish waits for all writes to complete while combining errors from errCh
func (c *copier) waitForFinish() error {
var err error
done := make(chan struct{})
go func() {
// when write latencies are long, several errors might have occurred
// drain them all as we wait for writes to complete.
err = c.drainErrs(done)
}()

c.wg.Wait()
close(done)
return err
}

// drainErrs drains all outstanding errors from writes
func (c *copier) drainErrs(done chan struct{}) error {
var err error
for {
select {
case <-done:
return err
default:
if writeErr := c.getErr(); writeErr != nil {
err = combineErrs(err, writeErr)
}
}
}
}

// combineErrs combines err with newErr so multiple errors can be represented
func combineErrs(err, newErr error) error {
if err == nil {
return newErr
}
return fmt.Errorf("%s, %w", err.Error(), newErr)
}

// id allows the creation of unique IDs based on UUID4 + an int32. This auto-increments.
type id struct {
u [64]byte
Expand Down
47 changes: 46 additions & 1 deletion sdk/storage/azblob/blockblob/chunkwriting_test.go
Expand Up @@ -30,6 +30,7 @@ type fakeBlockWriter struct {
path string
block int32
errOnBlock int32
stageDelay time.Duration
}

func newFakeBlockWriter() *fakeBlockWriter {
Expand All @@ -49,7 +50,12 @@ func newFakeBlockWriter() *fakeBlockWriter {

func (f *fakeBlockWriter) StageBlock(_ context.Context, blockID string, body io.ReadSeekCloser, _ *StageBlockOptions) (StageBlockResponse, error) {
n := atomic.AddInt32(&f.block, 1)
if n == f.errOnBlock {

if f.stageDelay > 0 {
time.Sleep(f.stageDelay)
}

if f.errOnBlock > -1 && n >= f.errOnBlock {
return StageBlockResponse{}, io.ErrNoProgress
}

Expand Down Expand Up @@ -192,6 +198,45 @@ func TestGetErr(t *testing.T) {
}
}

func TestSlowDestCopyFrom(t *testing.T) {
p, err := createSrcFile(_1MiB + 500*1024) //This should cause 2 reads
if err != nil {
panic(err)
}
defer func(name string) {
_ = os.Remove(name)
}(p)

from, err := os.Open(p)
if err != nil {
panic(err)
}
defer from.Close()

br := newFakeBlockWriter()
defer br.cleanup()

br.stageDelay = 200 * time.Millisecond
br.errOnBlock = 0

errs := make(chan error, 1)
go func() {
_, err := copyFromReader(context.Background(), from, br, UploadStreamOptions{})
errs <- err
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

select {
case <-ctx.Done():
failMsg := "TestSlowDestCopyFrom(slow writes shouldn't cause deadlock) failed: Context expired, copy deadlocked"
t.Error(failMsg)
case <-errs:
return
}
}

func TestCopyFromReader(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 26961ea

Please sign in to comment.