Skip to content

Commit

Permalink
zapcore: Unflake TestSamplerConcurrent (#1012)
Browse files Browse the repository at this point in the history
The TestSamplerConcurrent test frequently fails with the following error
in CI:

    --- FAIL: TestSamplerConcurrent (0.25s)
        sampler_test.go:198:
        	    Error Trace:	sampler_test.go:198
        	    Error:      	Max difference between 1250 and 1004 allowed is 125, but difference was 246
        	    Test:       	TestSamplerConcurrent
        	    Messages:   	Unexpected number of logs
    FAIL

The test is intended to verify that
despite an onsalught of messages from multiple goroutines,
we only allow at most `logsPerTick` messages per `tick`.

This was accompilshed by spin-looping 10 goroutines for `numTicks`,
each logging one of `numMessages` different messages,
and then verifying the final log count.

The source of flakiness here was the non-determinism in
how far a goroutine would get in logging enough messages such that
the sampler would be engaged.

In #948, we added a `zapcore.Clock` interface with a ticker and
a mock implementation.
Move that to `ztest` for use here.

To unflake the test, use the mock clock to control time and
for each goroutine, log `logsPerTick*2` messages `numTicks` times.
This gives us,

    for numGoroutines (10)
        for numTicks (25)
            log logsPerTick * 2 (50) messages

We end up attempting to log a total of,

    (numGoroutines * numTicks * logsPerTick * 2) messages
    = (10 * 25 * 50) messages
    = 12500 messages

Of these, the following should be sampled:

    numMessages * numTicks * logsPerTick
    = 5 * 10 * 25
    = 1250

Everything else should be dropped.

For extra confidence, use a SamplerHook (added in #813) to verify that
the number of sampled and dropped messages meet expectations.

Refs GO-873
  • Loading branch information
abhinav committed Sep 10, 2021
1 parent 5e08337 commit 10d89a7
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 68 deletions.
50 changes: 50 additions & 0 deletions internal/ztest/clock.go
@@ -0,0 +1,50 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ztest

import (
"time"

"github.com/benbjohnson/clock"
)

// MockClock provides control over the time.
type MockClock struct{ m *clock.Mock }

// NewMockClock builds a new mock clock that provides control of time.
func NewMockClock() *MockClock {
return &MockClock{clock.NewMock()}
}

// Now reports the current time.
func (c *MockClock) Now() time.Time {
return c.m.Now()
}

// NewTicker returns a time.Ticker that ticks at the specified frequency.
func (c *MockClock) NewTicker(d time.Duration) *time.Ticker {
return &time.Ticker{C: c.m.Ticker(d).C}
}

// Add progresses time by the given duration.
func (c *MockClock) Add(d time.Duration) {
c.m.Add(d)
}
57 changes: 57 additions & 0 deletions internal/ztest/clock_test.go
@@ -0,0 +1,57 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ztest

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)

func TestMockClock_NewTicker(t *testing.T) {
var n atomic.Int32
clock := NewMockClock()

done := make(chan struct{})
defer func() { <-done }() // wait for end

quit := make(chan struct{})
// Create a channel to increment every microsecond.
go func(ticker *time.Ticker) {
defer close(done)
for {
select {
case <-quit:
ticker.Stop()
return
case <-ticker.C:
n.Inc()
}
}
}(clock.NewTicker(time.Microsecond))

// Move clock forward.
clock.Add(2 * time.Microsecond)
assert.Equal(t, int32(2), n.Load())
close(quit)
}
2 changes: 1 addition & 1 deletion zapcore/buffered_write_syncer_test.go
Expand Up @@ -107,7 +107,7 @@ func TestBufferWriter(t *testing.T) {

t.Run("flush timer", func(t *testing.T) {
buf := &bytes.Buffer{}
clock := newControlledClock()
clock := ztest.NewMockClock()
ws := &BufferedWriteSyncer{
WS: AddSync(buf),
Size: 6,
Expand Down
4 changes: 1 addition & 3 deletions zapcore/clock.go
Expand Up @@ -20,9 +20,7 @@

package zapcore

import (
"time"
)
import "time"

// DefaultClock is the default clock used by Zap in operations that require
// time. This clock uses the system clock for all operations.
Expand Down
44 changes: 3 additions & 41 deletions zapcore/clock_test.go
Expand Up @@ -24,49 +24,11 @@ import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
"go.uber.org/zap/internal/ztest"
)

// controlledClock provides control over the time via a mock clock.
type controlledClock struct{ *clock.Mock }

func newControlledClock() *controlledClock {
return &controlledClock{clock.NewMock()}
}

func (c *controlledClock) NewTicker(d time.Duration) *time.Ticker {
return &time.Ticker{C: c.Ticker(d).C}
}

func TestControlledClock_NewTicker(t *testing.T) {
var n atomic.Int32
ctrlMock := newControlledClock()

done := make(chan struct{})
defer func() { <-done }() // wait for end

quit := make(chan struct{})
// Create a channel to increment every microsecond.
go func(ticker *time.Ticker) {
defer close(done)
for {
select {
case <-quit:
ticker.Stop()
return
case <-ticker.C:
n.Inc()
}
}
}(ctrlMock.NewTicker(time.Microsecond))

// Move clock forward.
ctrlMock.Add(2 * time.Microsecond)
assert.Equal(t, int32(2), n.Load())
close(quit)
}
// Verify that the mock clock satisfies the Clock interface.
var _ Clock = (*ztest.MockClock)(nil)

func TestSystemClock_NewTicker(t *testing.T) {
want := 3
Expand Down
77 changes: 54 additions & 23 deletions zapcore/sampler_test.go
Expand Up @@ -22,6 +22,7 @@ package zapcore_test

import (
"fmt"
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -158,50 +159,80 @@ func TestSamplerConcurrent(t *testing.T) {
numMessages = 5
numTicks = 25
numGoroutines = 10
tick = 10 * time.Millisecond

// We'll make a total of,
// (numGoroutines * numTicks * logsPerTick * 2) log attempts
// with numMessages unique messages.
numLogAttempts = numGoroutines * logsPerTick * numTicks * 2
// Of those, we'll accept (logsPerTick * numTicks) entries
// for each unique message.
expectedCount = numMessages * logsPerTick * numTicks
// The rest will be dropped.
expectedDropped = numLogAttempts - expectedCount
)

tick := ztest.Timeout(10 * time.Millisecond)
clock := ztest.NewMockClock()

cc := &countingCore{}
sampler := NewSamplerWithOptions(cc, tick, logsPerTick, 100000)

var (
done atomic.Bool
wg sync.WaitGroup
)
hook, dropped, sampled := makeSamplerCountingHook()
sampler := NewSamplerWithOptions(cc, tick, logsPerTick, 100000, SamplerHook(hook))

stop := make(chan struct{})
var wg sync.WaitGroup
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(i int) {
go func(i int, ticker *time.Ticker) {
defer wg.Done()
defer ticker.Stop()

for {
if done.Load() {
select {
case <-stop:
return
}
msg := fmt.Sprintf("msg%v", i%numMessages)
ent := Entry{Level: DebugLevel, Message: msg, Time: time.Now()}
if ce := sampler.Check(ent, nil); ce != nil {
ce.Write()
}

// Give a chance for other goroutines to run.
time.Sleep(time.Microsecond)
case <-ticker.C:
for j := 0; j < logsPerTick*2; j++ {
msg := fmt.Sprintf("msg%v", i%numMessages)
ent := Entry{
Level: DebugLevel,
Message: msg,
Time: clock.Now(),
}
if ce := sampler.Check(ent, nil); ce != nil {
ce.Write()
}

// Give a chance for other goroutines to run.
runtime.Gosched()
}
}
}
}(i)
}(i, clock.NewTicker(tick))
}

time.AfterFunc(numTicks*tick, func() {
done.Store(true)
})
clock.Add(tick * numTicks)
close(stop)
wg.Wait()

assert.InDelta(
assert.Equal(
t,
expectedCount,
cc.logs.Load(),
expectedCount/10,
int(cc.logs.Load()),
"Unexpected number of logs",
)
assert.Equal(t,
expectedCount,
int(sampled.Load()),
"Unexpected number of logs sampled",
)
assert.Equal(t,
expectedDropped,
int(dropped.Load()),
"Unexpected number of logs dropped",
)

}

func TestSamplerRaces(t *testing.T) {
Expand Down

0 comments on commit 10d89a7

Please sign in to comment.