diff --git a/benchmarks/zap_test.go b/benchmarks/zap_test.go index 92f7120ec..84784152c 100644 --- a/benchmarks/zap_test.go +++ b/benchmarks/zap_test.go @@ -116,7 +116,7 @@ func newZapLogger(lvl zapcore.Level) *zap.Logger { } func newSampledLogger(lvl zapcore.Level) *zap.Logger { - return zap.New(zapcore.NewSampler( + return zap.New(zapcore.NewSamplerWithOptions( newZapLogger(zap.DebugLevel).Core(), 100*time.Millisecond, 10, // first diff --git a/config.go b/config.go index eae1d237f..192fd1a94 100644 --- a/config.go +++ b/config.go @@ -32,10 +32,14 @@ import ( // global CPU and I/O load that logging puts on your process while attempting // to preserve a representative subset of your logs. // -// Values configured here are per-second. See zapcore.NewSampler for details. +// If specified, the Sampler will invoke the Hook after each decision. +// +// Values configured here are per-second. See zapcore.NewSamplerWithOptions for +// details. type SamplingConfig struct { - Initial int `json:"initial" yaml:"initial"` - Thereafter int `json:"thereafter" yaml:"thereafter"` + Initial int `json:"initial" yaml:"initial"` + Thereafter int `json:"thereafter" yaml:"thereafter"` + Hook func(zapcore.Entry, zapcore.SamplingDecision) `json:"-" yaml:"-"` } // Config offers a declarative way to construct a logger. It doesn't do @@ -208,9 +212,19 @@ func (cfg Config) buildOptions(errSink zapcore.WriteSyncer) []Option { opts = append(opts, AddStacktrace(stackLevel)) } - if cfg.Sampling != nil { + if scfg := cfg.Sampling; scfg != nil { opts = append(opts, WrapCore(func(core zapcore.Core) zapcore.Core { - return zapcore.NewSampler(core, time.Second, int(cfg.Sampling.Initial), int(cfg.Sampling.Thereafter)) + var samplerOpts []zapcore.SamplerOption + if scfg.Hook != nil { + samplerOpts = append(samplerOpts, zapcore.SamplerHook(scfg.Hook)) + } + return zapcore.NewSamplerWithOptions( + core, + time.Second, + cfg.Sampling.Initial, + cfg.Sampling.Thereafter, + samplerOpts..., + ) })) } diff --git a/config_test.go b/config_test.go index 0fa4b1efc..19af60795 100644 --- a/config_test.go +++ b/config_test.go @@ -27,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/atomic" "go.uber.org/zap/zapcore" ) @@ -144,3 +145,69 @@ func TestConfigWithMissingAttributes(t *testing.T) { }) } } + +func makeSamplerCountingHook() (h func(zapcore.Entry, zapcore.SamplingDecision), + dropped, sampled *atomic.Int64) { + dropped = new(atomic.Int64) + sampled = new(atomic.Int64) + h = func(_ zapcore.Entry, dec zapcore.SamplingDecision) { + if dec&zapcore.LogDropped > 0 { + dropped.Inc() + } else if dec&zapcore.LogSampled > 0 { + sampled.Inc() + } + } + return h, dropped, sampled +} + +func TestConfigWithSamplingHook(t *testing.T) { + shook, dcount, scount := makeSamplerCountingHook() + cfg := Config{ + Level: NewAtomicLevelAt(InfoLevel), + Development: false, + Sampling: &SamplingConfig{ + Initial: 100, + Thereafter: 100, + Hook: shook, + }, + Encoding: "json", + EncoderConfig: NewProductionEncoderConfig(), + OutputPaths: []string{"stderr"}, + ErrorOutputPaths: []string{"stderr"}, + } + expectRe := `{"level":"info","caller":"zap/config_test.go:\d+","msg":"info","k":"v","z":"zz"}` + "\n" + + `{"level":"warn","caller":"zap/config_test.go:\d+","msg":"warn","k":"v","z":"zz"}` + "\n" + expectDropped := 99 // 200 - 100 initial - 1 thereafter + expectSampled := 103 // 2 from initial + 100 + 1 thereafter + + temp, err := ioutil.TempFile("", "zap-prod-config-test") + require.NoError(t, err, "Failed to create temp file.") + defer func() { + err := os.Remove(temp.Name()) + if err != nil { + return + } + }() + + cfg.OutputPaths = []string{temp.Name()} + cfg.EncoderConfig.TimeKey = "" // no timestamps in tests + cfg.InitialFields = map[string]interface{}{"z": "zz", "k": "v"} + + logger, err := cfg.Build() + require.NoError(t, err, "Unexpected error constructing logger.") + + logger.Debug("debug") + logger.Info("info") + logger.Warn("warn") + + byteContents, err := ioutil.ReadAll(temp) + require.NoError(t, err, "Couldn't read log contents from temp file.") + logs := string(byteContents) + assert.Regexp(t, expectRe, logs, "Unexpected log output.") + + for i := 0; i < 200; i++ { + logger.Info("sampling") + } + assert.Equal(t, int64(expectDropped), dcount.Load()) + assert.Equal(t, int64(expectSampled), scount.Load()) +} diff --git a/zapcore/sampler.go b/zapcore/sampler.go index e31641863..25f10ca1d 100644 --- a/zapcore/sampler.go +++ b/zapcore/sampler.go @@ -81,33 +81,104 @@ func (c *counter) IncCheckReset(t time.Time, tick time.Duration) uint64 { return 1 } -type sampler struct { - Core +// SamplingDecision is a decision represented as a bit field made by sampler. +// More decisions may be added in the future. +type SamplingDecision uint32 - counts *counters - tick time.Duration - first, thereafter uint64 +const ( + // LogDropped indicates that the Sampler dropped a log entry. + LogDropped SamplingDecision = 1 << iota + // LogSampled indicates that the Sampler sampled a log entry. + LogSampled +) + +// optionFunc wraps a func so it satisfies the SamplerOption interface. +type optionFunc func(*sampler) + +func (f optionFunc) apply(s *sampler) { + f(s) +} + +// SamplerOption configures a Sampler. +type SamplerOption interface { + apply(*sampler) } -// NewSampler creates a Core that samples incoming entries, which caps the CPU -// and I/O load of logging while attempting to preserve a representative subset -// of your logs. +// nopSamplingHook is the default hook used by sampler. +func nopSamplingHook(Entry, SamplingDecision) {} + +// SamplerHook registers a function which will be called when Sampler makes a +// decision. +// +// This hook may be used to get visibility into the performance of the sampler. +// For example, use it to track metrics of dropped versus sampled logs. +// +// var dropped atomic.Int64 +// zapcore.SamplerHook(func(ent zapcore.Entry, dec zapcore.SamplingDecision) { +// if dec&zapcore.LogDropped > 0 { +// dropped.Inc() +// } +// }) +func SamplerHook(hook func(entry Entry, dec SamplingDecision)) SamplerOption { + return optionFunc(func(s *sampler) { + s.hook = hook + }) +} + +// NewSamplerWithOptions creates a Core that samples incoming entries, which +// caps the CPU and I/O load of logging while attempting to preserve a +// representative subset of your logs. // // Zap samples by logging the first N entries with a given level and message // each tick. If more Entries with the same level and message are seen during // the same interval, every Mth message is logged and the rest are dropped. // +// Sampler can be configured to report sampling decisions with the SamplerHook +// option. +// // Keep in mind that zap's sampling implementation is optimized for speed over // absolute precision; under load, each tick may be slightly over- or // under-sampled. -func NewSampler(core Core, tick time.Duration, first, thereafter int) Core { - return &sampler{ +func NewSamplerWithOptions(core Core, tick time.Duration, first, thereafter int, opts ...SamplerOption) Core { + s := &sampler{ Core: core, tick: tick, counts: newCounters(), first: uint64(first), thereafter: uint64(thereafter), + hook: nopSamplingHook, } + for _, opt := range opts { + opt.apply(s) + } + + return s +} + +type sampler struct { + Core + + counts *counters + tick time.Duration + first, thereafter uint64 + hook func(Entry, SamplingDecision) +} + +// NewSampler creates a Core that samples incoming entries, which +// caps the CPU and I/O load of logging while attempting to preserve a +// representative subset of your logs. +// +// Zap samples by logging the first N entries with a given level and message +// each tick. If more Entries with the same level and message are seen during +// the same interval, every Mth message is logged and the rest are dropped. +// +// Keep in mind that zap's sampling implementation is optimized for speed over +// absolute precision; under load, each tick may be slightly over- or +// under-sampled. +// +// Deprecated: use NewSamplerWithOptions. +func NewSampler(core Core, tick time.Duration, first, thereafter int) Core { + return NewSamplerWithOptions(core, tick, first, thereafter) } func (s *sampler) With(fields []Field) Core { @@ -117,6 +188,7 @@ func (s *sampler) With(fields []Field) Core { counts: s.counts, first: s.first, thereafter: s.thereafter, + hook: s.hook, } } @@ -128,7 +200,9 @@ func (s *sampler) Check(ent Entry, ce *CheckedEntry) *CheckedEntry { counter := s.counts.get(ent.Level, ent.Message) n := counter.IncCheckReset(ent.Time, s.tick) if n > s.first && (n-s.first)%s.thereafter != 0 { + s.hook(ent, LogDropped) return ce } + s.hook(ent, LogSampled) return s.Core.Check(ent, ce) } diff --git a/zapcore/sampler_bench_test.go b/zapcore/sampler_bench_test.go index af2e89782..a918be2e2 100644 --- a/zapcore/sampler_bench_test.go +++ b/zapcore/sampler_bench_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" "go.uber.org/zap/internal/ztest" . "go.uber.org/zap/zapcore" ) @@ -203,7 +205,7 @@ var counterTestCases = [][]string{ func BenchmarkSampler_Check(b *testing.B) { for _, keys := range counterTestCases { b.Run(fmt.Sprintf("%v keys", len(keys)), func(b *testing.B) { - fac := NewSampler( + fac := NewSamplerWithOptions( NewCore( NewJSONEncoder(testEncoderConfig()), &ztest.Discarder{}, @@ -228,3 +230,54 @@ func BenchmarkSampler_Check(b *testing.B) { }) } } + +func makeSamplerCountingHook() (func(_ Entry, dec SamplingDecision), *atomic.Int64, *atomic.Int64) { + droppedCount := new(atomic.Int64) + sampledCount := new(atomic.Int64) + h := func(_ Entry, dec SamplingDecision) { + if dec&LogDropped > 0 { + droppedCount.Inc() + } else if dec&LogSampled > 0 { + sampledCount.Inc() + } + } + return h, droppedCount, sampledCount +} + +func BenchmarkSampler_CheckWithHook(b *testing.B) { + hook, dropped, sampled := makeSamplerCountingHook() + for _, keys := range counterTestCases { + b.Run(fmt.Sprintf("%v keys", len(keys)), func(b *testing.B) { + fac := NewSamplerWithOptions( + NewCore( + NewJSONEncoder(testEncoderConfig()), + &ztest.Discarder{}, + DebugLevel, + ), + time.Millisecond, + 1, + 1000, + SamplerHook(hook), + ) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + ent := Entry{ + Level: DebugLevel + Level(i%4), + Message: keys[i], + } + _ = fac.Check(ent, nil) + i++ + if n := len(keys); i >= n { + i -= n + } + } + }) + }) + } + // We expect to see 1000 dropped messages for every sampled per settings, + // with a delta due to less 1000 messages getting dropped after initial one + // is sampled. + assert.Greater(b, dropped.Load()/1000, sampled.Load()-1000) +} diff --git a/zapcore/sampler_test.go b/zapcore/sampler_test.go index 9ba278b0b..71db0f9bd 100644 --- a/zapcore/sampler_test.go +++ b/zapcore/sampler_test.go @@ -37,6 +37,7 @@ import ( func fakeSampler(lvl LevelEnabler, tick time.Duration, first, thereafter int) (Core, *observer.ObservedLogs) { core, logs := observer.New(lvl) + // Keep using deprecated constructor for cc. core = NewSampler(core, tick, first, thereafter) return core, logs } @@ -162,7 +163,7 @@ func TestSamplerConcurrent(t *testing.T) { tick := ztest.Timeout(10 * time.Millisecond) cc := &countingCore{} - sampler := NewSampler(cc, tick, logsPerTick, 100000) + sampler := NewSamplerWithOptions(cc, tick, logsPerTick, 100000) var ( done atomic.Bool