diff --git a/.changelog/6523434cf1284529b69f0bc324ed7ab6.json b/.changelog/6523434cf1284529b69f0bc324ed7ab6.json new file mode 100644 index 00000000000..20f2a56fa77 --- /dev/null +++ b/.changelog/6523434cf1284529b69f0bc324ed7ab6.json @@ -0,0 +1,8 @@ +{ + "id": "6523434c-f128-4529-b69f-0bc324ed7ab6", + "type": "dependency", + "description": "Update SDK's internal copy of golang.org/x/sync/singleflight to address issue with test failing due to timeing issues", + "modules": [ + "." + ] +} \ No newline at end of file diff --git a/internal/sync/singleflight/LICENSE b/internal/sync/singleflight/LICENSE index 6a66aea5eaf..fe6a62006a5 100644 --- a/internal/sync/singleflight/LICENSE +++ b/internal/sync/singleflight/LICENSE @@ -14,7 +14,7 @@ distribution. contributors may be used to endorse or promote products derived from this software without specific prior written permission. -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +THIS SOFTWARE IS PROVIDED BY THE COPYIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT @@ -25,3 +25,4 @@ DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + diff --git a/internal/sync/singleflight/docs.go b/internal/sync/singleflight/docs.go new file mode 100644 index 00000000000..cb70616e802 --- /dev/null +++ b/internal/sync/singleflight/docs.go @@ -0,0 +1,7 @@ +// Package singleflight provides a duplicate function call suppression +// mechanism. This package is a fork of the Go golang.org/x/sync/singleflight +// package. The package is forked, because the package a part of the unstable +// and unversioned golang.org/x/sync module. +// +// https://github.com/golang/sync/tree/67f06af15bc961c363a7260195bcd53487529a21/singleflight +package singleflight diff --git a/internal/sync/singleflight/singleflight.go b/internal/sync/singleflight/singleflight.go index 14ad0c58911..e8a1b17d564 100644 --- a/internal/sync/singleflight/singleflight.go +++ b/internal/sync/singleflight/singleflight.go @@ -2,11 +2,44 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Package singleflight provides a duplicate function call suppression -// mechanism. package singleflight -import "sync" +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} // call is an in-flight or completed singleflight.Do call type call struct { @@ -57,6 +90,12 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e c.dups++ g.mu.Unlock() c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } return c.val, c.err, true } c := new(call) @@ -70,6 +109,8 @@ func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, e // DoChan is like Do but returns a channel that will receive the // results when they are ready. +// +// The returned channel will not be closed. func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() @@ -94,17 +135,66 @@ func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result // doCall handles the single call for a key. func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { - c.val, c.err = fn() - c.wg.Done() - - g.mu.Lock() - if !c.forgotten { - delete(g.m, key) - } - for _, ch := range c.chans { - ch <- Result{c.val, c.err, c.dups > 0} + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + c.wg.Done() + g.mu.Lock() + defer g.mu.Unlock() + if !c.forgotten { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true } - g.mu.Unlock() } // Forget tells the singleflight to forget about a key. Future calls diff --git a/internal/sync/singleflight/singleflight_test.go b/internal/sync/singleflight/singleflight_test.go index ad040379e8b..3e51203bd9c 100644 --- a/internal/sync/singleflight/singleflight_test.go +++ b/internal/sync/singleflight/singleflight_test.go @@ -5,8 +5,14 @@ package singleflight import ( + "bytes" "errors" "fmt" + "os" + "os/exec" + "runtime" + "runtime/debug" + "strings" "sync" "sync/atomic" "testing" @@ -91,69 +97,224 @@ func TestDoDupSuppress(t *testing.T) { func TestForget(t *testing.T) { var g Group - var firstStarted, firstFinished sync.WaitGroup + var ( + firstStarted = make(chan struct{}) + unblockFirst = make(chan struct{}) + firstFinished = make(chan struct{}) + ) - firstStarted.Add(1) - firstFinished.Add(1) - - firstCh := make(chan struct{}) go func() { g.Do("key", func() (i interface{}, e error) { - firstStarted.Done() - <-firstCh - firstFinished.Done() + close(firstStarted) + <-unblockFirst + close(firstFinished) return }) }() + <-firstStarted + g.Forget("key") - firstStarted.Wait() - g.Forget("key") // from this point no two function using same key should be executed concurrently + unblockSecond := make(chan struct{}) + secondResult := g.DoChan("key", func() (i interface{}, e error) { + <-unblockSecond + return 2, nil + }) - var secondStarted int32 - var secondFinished int32 - var thirdStarted int32 + close(unblockFirst) + <-firstFinished - secondCh := make(chan struct{}) - secondRunning := make(chan struct{}) - go func() { - g.Do("key", func() (i interface{}, e error) { + thirdResult := g.DoChan("key", func() (i interface{}, e error) { + return 3, nil + }) + + close(unblockSecond) + <-secondResult + r := <-thirdResult + if r.Val != 2 { + t.Errorf("We should receive result produced by second call, expected: 2, got %d", r.Val) + } +} + +func TestDoChan(t *testing.T) { + var g Group + ch := g.DoChan("key", func() (interface{}, error) { + return "bar", nil + }) + + res := <-ch + v := res.Val + err := res.Err + if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want { + t.Errorf("Do = %v; want %v", got, want) + } + if err != nil { + t.Errorf("Do error = %v", err) + } +} + +// Test singleflight behaves correctly after Do panic. +// See https://github.com/golang/go/issues/41133 +func TestPanicDo(t *testing.T) { + var g Group + fn := func() (interface{}, error) { + panic("invalid memory address or nil pointer dereference") + } + + const n = 5 + waited := int32(n) + panicCount := int32(0) + done := make(chan struct{}) + for i := 0; i < n; i++ { + go func() { + defer func() { + if err := recover(); err != nil { + t.Logf("Got panic: %v\n%s", err, debug.Stack()) + atomic.AddInt32(&panicCount, 1) + } + + if atomic.AddInt32(&waited, -1) == 0 { + close(done) + } + }() + + g.Do("key", fn) + }() + } + + select { + case <-done: + if panicCount != n { + t.Errorf("Expect %d panic, but got %d", n, panicCount) + } + case <-time.After(time.Second): + t.Fatalf("Do hangs") + } +} + +func TestGoexitDo(t *testing.T) { + var g Group + fn := func() (interface{}, error) { + runtime.Goexit() + return nil, nil + } + + const n = 5 + waited := int32(n) + done := make(chan struct{}) + for i := 0; i < n; i++ { + go func() { + var err error defer func() { + if err != nil { + t.Errorf("Error should be nil, but got: %v", err) + } + if atomic.AddInt32(&waited, -1) == 0 { + close(done) + } }() - atomic.AddInt32(&secondStarted, 1) - // Notify that we started - secondCh <- struct{}{} - // Wait other get above signal - <-secondRunning - <-secondCh - atomic.AddInt32(&secondFinished, 1) - return 2, nil + _, err, _ = g.Do("key", fn) + }() + } + + select { + case <-done: + case <-time.After(time.Second): + t.Fatalf("Do hangs") + } +} + +func TestPanicDoChan(t *testing.T) { + if runtime.GOOS == "js" { + t.Skipf("js does not support exec") + } + + if os.Getenv("TEST_PANIC_DOCHAN") != "" { + defer func() { + recover() + }() + + g := new(Group) + ch := g.DoChan("", func() (interface{}, error) { + panic("Panicking in DoChan") }) - }() + <-ch + t.Fatalf("DoChan unexpectedly returned") + } - close(firstCh) - firstFinished.Wait() // wait for first execution (which should not affect execution after Forget) + t.Parallel() - <-secondCh - // Notify second that we got the signal that it started - secondRunning <- struct{}{} - if atomic.LoadInt32(&secondStarted) != 1 { - t.Fatal("Second execution should be executed due to usage of forget") + cmd := exec.Command(os.Args[0], "-test.run="+t.Name(), "-test.v") + cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1") + out := new(bytes.Buffer) + cmd.Stdout = out + cmd.Stderr = out + if err := cmd.Start(); err != nil { + t.Fatal(err) } - if atomic.LoadInt32(&secondFinished) == 1 { - t.Fatal("Second execution should be still active") + err := cmd.Wait() + t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out) + if err == nil { + t.Errorf("Test subprocess passed; want a crash due to panic in DoChan") } + if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) { + t.Errorf("Test subprocess failed with an unexpected failure mode.") + } + if !bytes.Contains(out.Bytes(), []byte("Panicking in DoChan")) { + t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in DoChan") + } +} - close(secondCh) - result, _, _ := g.Do("key", func() (i interface{}, e error) { - atomic.AddInt32(&thirdStarted, 1) - return 3, nil - }) +func TestPanicDoSharedByDoChan(t *testing.T) { + if runtime.GOOS == "js" { + t.Skipf("js does not support exec") + } + + if os.Getenv("TEST_PANIC_DOCHAN") != "" { + blocked := make(chan struct{}) + unblock := make(chan struct{}) - if atomic.LoadInt32(&thirdStarted) != 0 { - t.Error("Third call should not be started because was started during second execution") + g := new(Group) + go func() { + defer func() { + recover() + }() + g.Do("", func() (interface{}, error) { + close(blocked) + <-unblock + panic("Panicking in Do") + }) + }() + + <-blocked + ch := g.DoChan("", func() (interface{}, error) { + panic("DoChan unexpectedly executed callback") + }) + close(unblock) + <-ch + t.Fatalf("DoChan unexpectedly returned") + } + + t.Parallel() + + cmd := exec.Command(os.Args[0], "-test.run="+t.Name(), "-test.v") + cmd.Env = append(os.Environ(), "TEST_PANIC_DOCHAN=1") + out := new(bytes.Buffer) + cmd.Stdout = out + cmd.Stderr = out + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + + err := cmd.Wait() + t.Logf("%s:\n%s", strings.Join(cmd.Args, " "), out) + if err == nil { + t.Errorf("Test subprocess passed; want a crash due to panic in Do shared by DoChan") + } + if bytes.Contains(out.Bytes(), []byte("DoChan unexpectedly")) { + t.Errorf("Test subprocess failed with an unexpected failure mode.") } - if result != 2 { - t.Errorf("We should receive result produced by second call, expected: 2, got %d", result) + if !bytes.Contains(out.Bytes(), []byte("Panicking in Do")) { + t.Errorf("Test subprocess failed, but the crash isn't caused by panicking in Do") } }