From 5f22c26641e0208088a60500302ea087290c24a3 Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Sat, 8 Feb 2020 17:08:00 +0800 Subject: [PATCH 01/54] add buffer sync --- zapcore/write_syncer.go | 49 ++++++++++++++++++++++++++++ zapcore/write_syncer_bench_test.go | 52 ++++++++++++++++++++++++++++-- zapcore/write_syncer_test.go | 11 +++++++ 3 files changed, 110 insertions(+), 2 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 209e25fe2..1f45e4e85 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -21,8 +21,10 @@ package zapcore import ( + "bufio" "io" "sync" + "time" "go.uber.org/multierr" ) @@ -75,6 +77,53 @@ func (s *lockedWriteSyncer) Sync() error { return err } +type bufferWriterSyncer struct { + bufferWriter *bufio.Writer + ws WriteSyncer +} + +// bufferSize sizes the buffer associated with each log file. It's large +// so that log records can accumulate without the logging thread blocking +// on disk I/O. The flushDaemon will block instead. +const bufferSize = 256 * 1024 + +// flushInterval means the default flush interval +const flushInterval = 30 * time.Second + +// Buffer wraps a WriteSyncer in a buffer to improve performance, +// which is implemented in https://github.com/golang/glog. +func Buffer(ws WriteSyncer) WriteSyncer { + if _, ok := ws.(*bufferWriterSyncer); ok { + // no need to layer on another buffer + return ws + } + + // bufio is not goroutine safe, so add lock writer here + ws = Lock(&bufferWriterSyncer{ + bufferWriter: bufio.NewWriterSize(ws, bufferSize), + }) + + // flush buffer every interval + // we do not need exit this goroutine explicitly + go func() { + for range time.NewTicker(flushInterval).C { + if err := ws.Sync(); err != nil { + return + } + } + }() + + return ws +} + +func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { + return s.bufferWriter.Write(bs) +} + +func (s *bufferWriterSyncer) Sync() error { + return s.bufferWriter.Flush() +} + type writerWrapper struct { io.Writer } diff --git a/zapcore/write_syncer_bench_test.go b/zapcore/write_syncer_bench_test.go index 0209d0f61..ad552684b 100644 --- a/zapcore/write_syncer_bench_test.go +++ b/zapcore/write_syncer_bench_test.go @@ -21,13 +21,16 @@ package zapcore import ( + "io/ioutil" + "os" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap/internal/ztest" ) func BenchmarkMultiWriteSyncer(b *testing.B) { - b.Run("2", func(b *testing.B) { + b.Run("2 discarder", func(b *testing.B) { w := NewMultiWriteSyncer( &ztest.Discarder{}, &ztest.Discarder{}, @@ -39,7 +42,7 @@ func BenchmarkMultiWriteSyncer(b *testing.B) { } }) }) - b.Run("4", func(b *testing.B) { + b.Run("4 discarder", func(b *testing.B) { w := NewMultiWriteSyncer( &ztest.Discarder{}, &ztest.Discarder{}, @@ -53,4 +56,49 @@ func BenchmarkMultiWriteSyncer(b *testing.B) { } }) }) + b.Run("4 discarder with buffer", func(b *testing.B) { + w := Buffer(NewMultiWriteSyncer( + &ztest.Discarder{}, + &ztest.Discarder{}, + &ztest.Discarder{}, + &ztest.Discarder{}, + )) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + w.Write([]byte("foobarbazbabble")) + } + }) + }) +} + +func BenchmarkWriteSyncer(b *testing.B) { + b.Run("write file with no buffer", func(b *testing.B) { + file, err := ioutil.TempFile(".", "*") + assert.Nil(b, err) + defer file.Close() + defer os.Remove(file.Name()) + + w := AddSync(file) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + w.Write([]byte("foobarbazbabble")) + } + }) + }) + b.Run("write file with buffer", func(b *testing.B) { + file, err := ioutil.TempFile(".", "*") + assert.Nil(b, err) + defer file.Close() + defer os.Remove(file.Name()) + + w := Buffer(AddSync(file)) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + w.Write([]byte("foobarbazbabble")) + } + }) + }) } diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 3ccb0af24..e5b9159c3 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -65,6 +65,17 @@ func TestAddSyncWriter(t *testing.T) { assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") } +func TestBufferWriter(t *testing.T) { + // If we pass a plain io.Writer, make sure that we still get a WriteSyncer + // with a no-op Sync. + buf := &bytes.Buffer{} + ws := Buffer(AddSync(buf)) + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") + assert.Equal(t, "foo", buf.String(), "Unexpected log calling a no-op Sync method.") +} + func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) { w := &ztest.Buffer{} From 36d1a4f5f3e32c9893ca77b678ef97d38e13d239 Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Tue, 11 Feb 2020 11:16:52 +0800 Subject: [PATCH 02/54] support config bufferSize and flushInterval, improve logic --- zapcore/write_syncer.go | 29 +++++++++++++++++++---------- zapcore/write_syncer_bench_test.go | 4 ++-- zapcore/write_syncer_test.go | 8 +++++--- 3 files changed, 26 insertions(+), 15 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 1f45e4e85..16e5d5d6b 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -82,20 +82,29 @@ type bufferWriterSyncer struct { ws WriteSyncer } -// bufferSize sizes the buffer associated with each log file. It's large +// defaultBufferSize sizes the buffer associated with each log file. It's large // so that log records can accumulate without the logging thread blocking -// on disk I/O. The flushDaemon will block instead. -const bufferSize = 256 * 1024 +// on disk I/O untill buffer fills up. The flushDaemon will block instead. +const defaultBufferSize = 256 * 1024 -// flushInterval means the default flush interval -const flushInterval = 30 * time.Second +// defaultFlushInterval means the default flush interval +const defaultFlushInterval = 30 * time.Second // Buffer wraps a WriteSyncer in a buffer to improve performance, -// which is implemented in https://github.com/golang/glog. -func Buffer(ws WriteSyncer) WriteSyncer { - if _, ok := ws.(*bufferWriterSyncer); ok { - // no need to layer on another buffer - return ws +func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSyncer { + if lws, ok := ws.(*lockedWriteSyncer); ok { + if _, ok := lws.ws.(*bufferWriterSyncer); ok { + // no need to layer on another buffer + return ws + } + } + + if bufferSize == 0 { + bufferSize = defaultBufferSize + } + + if flushInterval == 0 { + flushInterval = defaultFlushInterval } // bufio is not goroutine safe, so add lock writer here diff --git a/zapcore/write_syncer_bench_test.go b/zapcore/write_syncer_bench_test.go index ad552684b..65626802e 100644 --- a/zapcore/write_syncer_bench_test.go +++ b/zapcore/write_syncer_bench_test.go @@ -62,7 +62,7 @@ func BenchmarkMultiWriteSyncer(b *testing.B) { &ztest.Discarder{}, &ztest.Discarder{}, &ztest.Discarder{}, - )) + ), 0, 0) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -93,7 +93,7 @@ func BenchmarkWriteSyncer(b *testing.B) { defer file.Close() defer os.Remove(file.Name()) - w := Buffer(AddSync(file)) + w := Buffer(AddSync(file), 0, 0) b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index e5b9159c3..845635738 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -24,6 +24,7 @@ import ( "bytes" "errors" "testing" + "time" "io" @@ -69,11 +70,12 @@ func TestBufferWriter(t *testing.T) { // If we pass a plain io.Writer, make sure that we still get a WriteSyncer // with a no-op Sync. buf := &bytes.Buffer{} - ws := Buffer(AddSync(buf)) + ws := Buffer(Buffer(AddSync(buf), 0, time.Millisecond), 0, 0) requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") - assert.Equal(t, "foo", buf.String(), "Unexpected log calling a no-op Sync method.") + time.Sleep(2 * time.Millisecond) + // assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") + assert.Equal(t, "foo", buf.String(), "Unexpected log string") } func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) { From 9f7be81da2808109eb5a7820a8ba6e13d585e56a Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Thu, 13 Feb 2020 19:26:04 +0800 Subject: [PATCH 03/54] improve --- zapcore/write_syncer.go | 15 +++++++++++++++ zapcore/write_syncer_test.go | 25 +++++++++++++++++-------- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 16e5d5d6b..c397a422a 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -91,6 +91,8 @@ const defaultBufferSize = 256 * 1024 const defaultFlushInterval = 30 * time.Second // Buffer wraps a WriteSyncer in a buffer to improve performance, +// if bufferSize=0, we set it to defaultBufferSize +// if flushInterval=0, we set it to defaultFlushInterval func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSyncer { if lws, ok := ws.(*lockedWriteSyncer); ok { if _, ok := lws.ws.(*bufferWriterSyncer); ok { @@ -126,6 +128,19 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSy } func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { + + // there are some logic internal for bufio.Writer here: + // 1. when the buffer is not enough, log would be written to disk directly + // 2. when the buffer is enough, log would not be flushed until the buffer is filled up + // this would lead to log spliting, which is not acceptable for log collector + // so we need to flush bufferWriter before writing the log into bufferWriter + if len(bs) > s.bufferWriter.Available() && s.bufferWriter.Buffered() > 0 { + err := s.bufferWriter.Flush() + if err != nil { + return 0, err + } + } + return s.bufferWriter.Write(bs) } diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 845635738..257a0eb74 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -24,7 +24,6 @@ import ( "bytes" "errors" "testing" - "time" "io" @@ -69,13 +68,23 @@ func TestAddSyncWriter(t *testing.T) { func TestBufferWriter(t *testing.T) { // If we pass a plain io.Writer, make sure that we still get a WriteSyncer // with a no-op Sync. - buf := &bytes.Buffer{} - ws := Buffer(Buffer(AddSync(buf), 0, time.Millisecond), 0, 0) - requireWriteWorks(t, ws) - assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - time.Sleep(2 * time.Millisecond) - // assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") - assert.Equal(t, "foo", buf.String(), "Unexpected log string") + t.Run("default", func(t *testing.T) { + buf := &bytes.Buffer{} + ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0) + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) + + t.Run("small buffer", func(t *testing.T) { + buf := &bytes.Buffer{} + ws := Buffer(Buffer(AddSync(buf), 5, 0), 5, 0) + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + requireWriteWorks(t, ws) + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) } func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) { From c1f54ca9af242a0d6ac968b72785fdc8a1110ea0 Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Thu, 13 Feb 2020 19:38:24 +0800 Subject: [PATCH 04/54] update comment --- zapcore/write_syncer.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index c397a422a..a31f1a765 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -82,17 +82,15 @@ type bufferWriterSyncer struct { ws WriteSyncer } -// defaultBufferSize sizes the buffer associated with each log file. It's large -// so that log records can accumulate without the logging thread blocking -// on disk I/O untill buffer fills up. The flushDaemon will block instead. +// defaultBufferSize sizes the buffer associated with each WriterSync. const defaultBufferSize = 256 * 1024 // defaultFlushInterval means the default flush interval const defaultFlushInterval = 30 * time.Second // Buffer wraps a WriteSyncer in a buffer to improve performance, -// if bufferSize=0, we set it to defaultBufferSize -// if flushInterval=0, we set it to defaultFlushInterval +// if bufferSize = 0, we set it to defaultBufferSize +// if flushInterval = 0, we set it to defaultFlushInterval func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSyncer { if lws, ok := ws.(*lockedWriteSyncer); ok { if _, ok := lws.ws.(*bufferWriterSyncer); ok { @@ -130,10 +128,10 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSy func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { // there are some logic internal for bufio.Writer here: - // 1. when the buffer is not enough, log would be written to disk directly - // 2. when the buffer is enough, log would not be flushed until the buffer is filled up + // 1. when the buffer is enough, data would not be flushed. + // 2. when the buffer is not enough, data would be flushed as soon as the buffer fills up. // this would lead to log spliting, which is not acceptable for log collector - // so we need to flush bufferWriter before writing the log into bufferWriter + // so we need to flush bufferWriter before writing the data into bufferWriter if len(bs) > s.bufferWriter.Available() && s.bufferWriter.Buffered() > 0 { err := s.bufferWriter.Flush() if err != nil { From 93832bcac38c2a2e26873a20963a5107ac1f0553 Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Tue, 25 Feb 2020 10:49:19 +0800 Subject: [PATCH 05/54] WriterSyncer support Close method --- internal/ztest/writer.go | 7 +++++++ logger.go | 6 ++++++ sink.go | 2 -- writer_test.go | 4 ++++ zapcore/core.go | 7 +++++++ zapcore/sampler_test.go | 1 + zapcore/tee.go | 8 ++++++++ zapcore/write_syncer.go | 36 ++++++++++++++++++++++++++++++++---- zapcore/write_syncer_test.go | 9 +++++++++ zaptest/logger.go | 4 ++++ zaptest/observer/observer.go | 4 ++++ 11 files changed, 82 insertions(+), 6 deletions(-) diff --git a/internal/ztest/writer.go b/internal/ztest/writer.go index 9fdd5805e..2c1a3fb89 100644 --- a/internal/ztest/writer.go +++ b/internal/ztest/writer.go @@ -45,6 +45,13 @@ func (s *Syncer) Sync() error { return s.err } +// Close records that it was called, then returns the user-supplied error (if +// any). +func (s *Syncer) Close() error { + s.called = true + return s.err +} + // Called reports whether the Sync method was called. func (s *Syncer) Called() bool { return s.called diff --git a/logger.go b/logger.go index ea484aed1..1f25c9ef9 100644 --- a/logger.go +++ b/logger.go @@ -244,6 +244,12 @@ func (log *Logger) Sync() error { return log.core.Sync() } +// Close calls the underlying Core's Close method, flushing any buffered log +// entries. Applications should take care to call Close before exiting. +func (log *Logger) Close() error { + return log.core.Close() +} + // Core returns the Logger's underlying zapcore.Core. func (log *Logger) Core() zapcore.Core { return log.core diff --git a/sink.go b/sink.go index df46fa87a..ba9172b17 100644 --- a/sink.go +++ b/sink.go @@ -23,7 +23,6 @@ package zap import ( "errors" "fmt" - "io" "net/url" "os" "strings" @@ -55,7 +54,6 @@ func resetSinkRegistry() { // Sink defines the interface to write to and close logger destinations. type Sink interface { zapcore.WriteSyncer - io.Closer } type nopCloserSink struct{ zapcore.WriteSyncer } diff --git a/writer_test.go b/writer_test.go index b9b5389ca..94e90da76 100644 --- a/writer_test.go +++ b/writer_test.go @@ -152,6 +152,10 @@ func (w *testWriter) Sync() error { return nil } +func (w *testWriter) Close() error { + return nil +} + func TestOpenWithErroringSinkFactory(t *testing.T) { defer resetSinkRegistry() diff --git a/zapcore/core.go b/zapcore/core.go index a1ef8b034..98abc6399 100644 --- a/zapcore/core.go +++ b/zapcore/core.go @@ -42,6 +42,8 @@ type Core interface { Write(Entry, []Field) error // Sync flushes buffered logs (if any). Sync() error + // Close cleans all WriterSync resources (if any). + Close() error } type nopCore struct{} @@ -53,6 +55,7 @@ func (n nopCore) With([]Field) Core { return n } func (nopCore) Check(_ Entry, ce *CheckedEntry) *CheckedEntry { return ce } func (nopCore) Write(Entry, []Field) error { return nil } func (nopCore) Sync() error { return nil } +func (nopCore) Close() error { return nil } // NewCore creates a Core that writes logs to a WriteSyncer. func NewCore(enc Encoder, ws WriteSyncer, enab LevelEnabler) Core { @@ -104,6 +107,10 @@ func (c *ioCore) Sync() error { return c.out.Sync() } +func (c *ioCore) Close() error { + return c.out.Close() +} + func (c *ioCore) clone() *ioCore { return &ioCore{ LevelEnabler: c.LevelEnabler, diff --git a/zapcore/sampler_test.go b/zapcore/sampler_test.go index 71db0f9bd..f0a8328f2 100644 --- a/zapcore/sampler_test.go +++ b/zapcore/sampler_test.go @@ -151,6 +151,7 @@ func (c *countingCore) Write(Entry, []Field) error { func (c *countingCore) With([]Field) Core { return c } func (*countingCore) Enabled(Level) bool { return true } func (*countingCore) Sync() error { return nil } +func (*countingCore) Close() error { return nil } func TestSamplerConcurrent(t *testing.T) { const ( diff --git a/zapcore/tee.go b/zapcore/tee.go index 07a32eef9..93ac0e7dc 100644 --- a/zapcore/tee.go +++ b/zapcore/tee.go @@ -79,3 +79,11 @@ func (mc multiCore) Sync() error { } return err } + +func (mc multiCore) Close() error { + var err error + for i := range mc { + err = multierr.Append(err, mc[i].Close()) + } + return err +} diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index a31f1a765..97b2a17b5 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -22,6 +22,7 @@ package zapcore import ( "bufio" + "context" "io" "sync" "time" @@ -33,12 +34,13 @@ import ( // that *os.File (and thus, os.Stderr and os.Stdout) implement WriteSyncer. type WriteSyncer interface { io.Writer + Close() error Sync() error } // AddSync converts an io.Writer to a WriteSyncer. It attempts to be // intelligent: if the concrete type of the io.Writer implements WriteSyncer, -// we'll use the existing Sync method. If it doesn't, we'll add a no-op Sync. +// we'll use the existing Sync and Close method. If it doesn't, we'll add a no-op Sync and Close. func AddSync(w io.Writer) WriteSyncer { switch w := w.(type) { case WriteSyncer: @@ -77,9 +79,17 @@ func (s *lockedWriteSyncer) Sync() error { return err } +func (s *lockedWriteSyncer) Close() error { + s.Lock() + err := s.ws.Close() + s.Unlock() + return err +} + type bufferWriterSyncer struct { - bufferWriter *bufio.Writer ws WriteSyncer + bufferWriter *bufio.Writer + cancel context.CancelFunc } // defaultBufferSize sizes the buffer associated with each WriterSync. @@ -107,18 +117,24 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSy flushInterval = defaultFlushInterval } + ctx, cancel := context.WithCancel(context.Background()) + // bufio is not goroutine safe, so add lock writer here ws = Lock(&bufferWriterSyncer{ bufferWriter: bufio.NewWriterSize(ws, bufferSize), + cancel: cancel, }) // flush buffer every interval // we do not need exit this goroutine explicitly go func() { - for range time.NewTicker(flushInterval).C { + select { + case <-time.NewTicker(flushInterval).C: if err := ws.Sync(); err != nil { return } + case <-ctx.Done(): + return } }() @@ -126,7 +142,6 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSy } func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { - // there are some logic internal for bufio.Writer here: // 1. when the buffer is enough, data would not be flushed. // 2. when the buffer is not enough, data would be flushed as soon as the buffer fills up. @@ -146,6 +161,11 @@ func (s *bufferWriterSyncer) Sync() error { return s.bufferWriter.Flush() } +func (s *bufferWriterSyncer) Close() error { + s.cancel() + return s.Sync() +} + type writerWrapper struct { io.Writer } @@ -154,6 +174,10 @@ func (w writerWrapper) Sync() error { return nil } +func (w writerWrapper) Close() error { + return nil +} + type multiWriteSyncer []WriteSyncer // NewMultiWriteSyncer creates a WriteSyncer that duplicates its writes @@ -192,3 +216,7 @@ func (ws multiWriteSyncer) Sync() error { } return err } + +func (ws multiWriteSyncer) Close() error { + return ws.Sync() +} diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 257a0eb74..33f93643a 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -85,6 +85,15 @@ func TestBufferWriter(t *testing.T) { requireWriteWorks(t, ws) assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) + + t.Run("cancel context", func(t *testing.T) { + buf := &bytes.Buffer{} + ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0) + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + assert.NoError(t, ws.Close(), "Unexpected error calling a no-op Sync method.") + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) } func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) { diff --git a/zaptest/logger.go b/zaptest/logger.go index 1e2451c26..f95304e4f 100644 --- a/zaptest/logger.go +++ b/zaptest/logger.go @@ -138,3 +138,7 @@ func (w testingWriter) Write(p []byte) (n int, err error) { func (w testingWriter) Sync() error { return nil } + +func (w testingWriter) Close() error { + return nil +} diff --git a/zaptest/observer/observer.go b/zaptest/observer/observer.go index 78f5be45d..f698fa96e 100644 --- a/zaptest/observer/observer.go +++ b/zaptest/observer/observer.go @@ -165,3 +165,7 @@ func (co *contextObserver) Write(ent zapcore.Entry, fields []zapcore.Field) erro func (co *contextObserver) Sync() error { return nil } + +func (co *contextObserver) Close() error { + return nil +} From ba42206bbf746ab82562f2d0e4c1bcdb2fb3e96f Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Tue, 25 Feb 2020 11:12:41 +0800 Subject: [PATCH 06/54] improve --- zapcore/write_syncer_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 33f93643a..eb16465c7 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -68,7 +68,7 @@ func TestAddSyncWriter(t *testing.T) { func TestBufferWriter(t *testing.T) { // If we pass a plain io.Writer, make sure that we still get a WriteSyncer // with a no-op Sync. - t.Run("default", func(t *testing.T) { + t.Run("sync", func(t *testing.T) { buf := &bytes.Buffer{} ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0) requireWriteWorks(t, ws) @@ -77,23 +77,24 @@ func TestBufferWriter(t *testing.T) { assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) - t.Run("small buffer", func(t *testing.T) { + t.Run("close", func(t *testing.T) { buf := &bytes.Buffer{} - ws := Buffer(Buffer(AddSync(buf), 5, 0), 5, 0) + ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0) requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - requireWriteWorks(t, ws) + assert.NoError(t, ws.Close(), "Unexpected error calling a no-op Sync method.") assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) - t.Run("cancel context", func(t *testing.T) { + t.Run("small buffer", func(t *testing.T) { buf := &bytes.Buffer{} - ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0) + ws := Buffer(Buffer(AddSync(buf), 5, 0), 5, 0) requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - assert.NoError(t, ws.Close(), "Unexpected error calling a no-op Sync method.") + requireWriteWorks(t, ws) assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) + } func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) { From 069d7b9ec75a2c5978bd624c3c8b36fdba9eab7f Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Tue, 25 Feb 2020 11:13:18 +0800 Subject: [PATCH 07/54] fix spell --- zapcore/write_syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index eb16465c7..3f068b0c0 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -82,7 +82,7 @@ func TestBufferWriter(t *testing.T) { ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0) requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - assert.NoError(t, ws.Close(), "Unexpected error calling a no-op Sync method.") + assert.NoError(t, ws.Close(), "Unexpected error calling a no-op Close method.") assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) From 60350a25ce5cf836c85b87a34a17c87dfaa1faff Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Tue, 3 Mar 2020 15:48:21 +0800 Subject: [PATCH 08/54] improve cancel logic --- internal/ztest/writer.go | 7 ------- logger.go | 6 ------ sink.go | 2 ++ writer_test.go | 4 ---- zapcore/core.go | 7 ------- zapcore/sampler_test.go | 1 - zapcore/tee.go | 8 -------- zapcore/write_syncer.go | 33 ++++++++++++------------------ zapcore/write_syncer_bench_test.go | 6 ++++-- zapcore/write_syncer_test.go | 25 +++++++++++++++++----- 10 files changed, 39 insertions(+), 60 deletions(-) diff --git a/internal/ztest/writer.go b/internal/ztest/writer.go index 2c1a3fb89..9fdd5805e 100644 --- a/internal/ztest/writer.go +++ b/internal/ztest/writer.go @@ -45,13 +45,6 @@ func (s *Syncer) Sync() error { return s.err } -// Close records that it was called, then returns the user-supplied error (if -// any). -func (s *Syncer) Close() error { - s.called = true - return s.err -} - // Called reports whether the Sync method was called. func (s *Syncer) Called() bool { return s.called diff --git a/logger.go b/logger.go index 1f25c9ef9..ea484aed1 100644 --- a/logger.go +++ b/logger.go @@ -244,12 +244,6 @@ func (log *Logger) Sync() error { return log.core.Sync() } -// Close calls the underlying Core's Close method, flushing any buffered log -// entries. Applications should take care to call Close before exiting. -func (log *Logger) Close() error { - return log.core.Close() -} - // Core returns the Logger's underlying zapcore.Core. func (log *Logger) Core() zapcore.Core { return log.core diff --git a/sink.go b/sink.go index ba9172b17..df46fa87a 100644 --- a/sink.go +++ b/sink.go @@ -23,6 +23,7 @@ package zap import ( "errors" "fmt" + "io" "net/url" "os" "strings" @@ -54,6 +55,7 @@ func resetSinkRegistry() { // Sink defines the interface to write to and close logger destinations. type Sink interface { zapcore.WriteSyncer + io.Closer } type nopCloserSink struct{ zapcore.WriteSyncer } diff --git a/writer_test.go b/writer_test.go index 94e90da76..b9b5389ca 100644 --- a/writer_test.go +++ b/writer_test.go @@ -152,10 +152,6 @@ func (w *testWriter) Sync() error { return nil } -func (w *testWriter) Close() error { - return nil -} - func TestOpenWithErroringSinkFactory(t *testing.T) { defer resetSinkRegistry() diff --git a/zapcore/core.go b/zapcore/core.go index 98abc6399..a1ef8b034 100644 --- a/zapcore/core.go +++ b/zapcore/core.go @@ -42,8 +42,6 @@ type Core interface { Write(Entry, []Field) error // Sync flushes buffered logs (if any). Sync() error - // Close cleans all WriterSync resources (if any). - Close() error } type nopCore struct{} @@ -55,7 +53,6 @@ func (n nopCore) With([]Field) Core { return n } func (nopCore) Check(_ Entry, ce *CheckedEntry) *CheckedEntry { return ce } func (nopCore) Write(Entry, []Field) error { return nil } func (nopCore) Sync() error { return nil } -func (nopCore) Close() error { return nil } // NewCore creates a Core that writes logs to a WriteSyncer. func NewCore(enc Encoder, ws WriteSyncer, enab LevelEnabler) Core { @@ -107,10 +104,6 @@ func (c *ioCore) Sync() error { return c.out.Sync() } -func (c *ioCore) Close() error { - return c.out.Close() -} - func (c *ioCore) clone() *ioCore { return &ioCore{ LevelEnabler: c.LevelEnabler, diff --git a/zapcore/sampler_test.go b/zapcore/sampler_test.go index f0a8328f2..71db0f9bd 100644 --- a/zapcore/sampler_test.go +++ b/zapcore/sampler_test.go @@ -151,7 +151,6 @@ func (c *countingCore) Write(Entry, []Field) error { func (c *countingCore) With([]Field) Core { return c } func (*countingCore) Enabled(Level) bool { return true } func (*countingCore) Sync() error { return nil } -func (*countingCore) Close() error { return nil } func TestSamplerConcurrent(t *testing.T) { const ( diff --git a/zapcore/tee.go b/zapcore/tee.go index 93ac0e7dc..07a32eef9 100644 --- a/zapcore/tee.go +++ b/zapcore/tee.go @@ -79,11 +79,3 @@ func (mc multiCore) Sync() error { } return err } - -func (mc multiCore) Close() error { - var err error - for i := range mc { - err = multierr.Append(err, mc[i].Close()) - } - return err -} diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 97b2a17b5..a8a824933 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -34,7 +34,6 @@ import ( // that *os.File (and thus, os.Stderr and os.Stdout) implement WriteSyncer. type WriteSyncer interface { io.Writer - Close() error Sync() error } @@ -79,17 +78,9 @@ func (s *lockedWriteSyncer) Sync() error { return err } -func (s *lockedWriteSyncer) Close() error { - s.Lock() - err := s.ws.Close() - s.Unlock() - return err -} - type bufferWriterSyncer struct { ws WriteSyncer bufferWriter *bufio.Writer - cancel context.CancelFunc } // defaultBufferSize sizes the buffer associated with each WriterSync. @@ -98,14 +89,24 @@ const defaultBufferSize = 256 * 1024 // defaultFlushInterval means the default flush interval const defaultFlushInterval = 30 * time.Second +// CancelFunc should be called when the caller exits to clean up buffers. +type CancelFunc func() error + // Buffer wraps a WriteSyncer in a buffer to improve performance, // if bufferSize = 0, we set it to defaultBufferSize // if flushInterval = 0, we set it to defaultFlushInterval -func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSyncer { +func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteSyncer, CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + + cancelfunc := func() error { + cancel() + return ws.Sync() + } + if lws, ok := ws.(*lockedWriteSyncer); ok { if _, ok := lws.ws.(*bufferWriterSyncer); ok { // no need to layer on another buffer - return ws + return ws, cancelfunc } } @@ -117,12 +118,9 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSy flushInterval = defaultFlushInterval } - ctx, cancel := context.WithCancel(context.Background()) - // bufio is not goroutine safe, so add lock writer here ws = Lock(&bufferWriterSyncer{ bufferWriter: bufio.NewWriterSize(ws, bufferSize), - cancel: cancel, }) // flush buffer every interval @@ -138,7 +136,7 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSy } }() - return ws + return ws, cancelfunc } func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { @@ -161,11 +159,6 @@ func (s *bufferWriterSyncer) Sync() error { return s.bufferWriter.Flush() } -func (s *bufferWriterSyncer) Close() error { - s.cancel() - return s.Sync() -} - type writerWrapper struct { io.Writer } diff --git a/zapcore/write_syncer_bench_test.go b/zapcore/write_syncer_bench_test.go index 65626802e..dc5d40ea1 100644 --- a/zapcore/write_syncer_bench_test.go +++ b/zapcore/write_syncer_bench_test.go @@ -57,12 +57,13 @@ func BenchmarkMultiWriteSyncer(b *testing.B) { }) }) b.Run("4 discarder with buffer", func(b *testing.B) { - w := Buffer(NewMultiWriteSyncer( + w, cancel := Buffer(NewMultiWriteSyncer( &ztest.Discarder{}, &ztest.Discarder{}, &ztest.Discarder{}, &ztest.Discarder{}, ), 0, 0) + defer cancel() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -93,7 +94,8 @@ func BenchmarkWriteSyncer(b *testing.B) { defer file.Close() defer os.Remove(file.Name()) - w := Buffer(AddSync(file), 0, 0) + w, cancel := Buffer(AddSync(file), 0, 0) + defer cancel() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 3f068b0c0..0fdf85e8a 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -70,25 +70,40 @@ func TestBufferWriter(t *testing.T) { // with a no-op Sync. t.Run("sync", func(t *testing.T) { buf := &bytes.Buffer{} - ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0) + ws, cancel := Buffer(AddSync(buf), 0, 0) + defer cancel() requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) - t.Run("close", func(t *testing.T) { + t.Run("1 cancel", func(t *testing.T) { buf := &bytes.Buffer{} - ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0) + ws, cancel := Buffer(AddSync(buf), 0, 0) requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - assert.NoError(t, ws.Close(), "Unexpected error calling a no-op Close method.") + cancel() + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) + + t.Run("2 cancel", func(t *testing.T) { + buf := &bytes.Buffer{} + bufsync, cancel1 := Buffer(AddSync(buf), 0, 0) + ws, cancel2 := Buffer(bufsync, 0, 0) + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + cancel2() + cancel1() assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) t.Run("small buffer", func(t *testing.T) { buf := &bytes.Buffer{} - ws := Buffer(Buffer(AddSync(buf), 5, 0), 5, 0) + bufsync, cancel1 := Buffer(AddSync(buf), 5, 0) + ws, cancel2 := Buffer(bufsync, 5, 0) + defer cancel1() + defer cancel2() requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") requireWriteWorks(t, ws) From 1c240178e48a99c8951942c46370d5525ff7c65d Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Tue, 3 Mar 2020 15:50:44 +0800 Subject: [PATCH 09/54] improve --- zapcore/write_syncer.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index a8a824933..a30e8954d 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -39,7 +39,7 @@ type WriteSyncer interface { // AddSync converts an io.Writer to a WriteSyncer. It attempts to be // intelligent: if the concrete type of the io.Writer implements WriteSyncer, -// we'll use the existing Sync and Close method. If it doesn't, we'll add a no-op Sync and Close. +// we'll use the existing Sync method. If it doesn't, we'll add a no-op Sync. func AddSync(w io.Writer) WriteSyncer { switch w := w.(type) { case WriteSyncer: @@ -209,7 +209,3 @@ func (ws multiWriteSyncer) Sync() error { } return err } - -func (ws multiWriteSyncer) Close() error { - return ws.Sync() -} From 6ab72db4cbc8106f53be5d2d8c8b1ec539a7ac6d Mon Sep 17 00:00:00 2001 From: hnlq715 Date: Tue, 3 Mar 2020 15:51:44 +0800 Subject: [PATCH 10/54] remove close --- zapcore/write_syncer.go | 4 ---- zaptest/logger.go | 4 ---- zaptest/observer/observer.go | 4 ---- 3 files changed, 12 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index a30e8954d..fb2a96df9 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -167,10 +167,6 @@ func (w writerWrapper) Sync() error { return nil } -func (w writerWrapper) Close() error { - return nil -} - type multiWriteSyncer []WriteSyncer // NewMultiWriteSyncer creates a WriteSyncer that duplicates its writes diff --git a/zaptest/logger.go b/zaptest/logger.go index f95304e4f..1e2451c26 100644 --- a/zaptest/logger.go +++ b/zaptest/logger.go @@ -138,7 +138,3 @@ func (w testingWriter) Write(p []byte) (n int, err error) { func (w testingWriter) Sync() error { return nil } - -func (w testingWriter) Close() error { - return nil -} diff --git a/zaptest/observer/observer.go b/zaptest/observer/observer.go index f698fa96e..78f5be45d 100644 --- a/zaptest/observer/observer.go +++ b/zaptest/observer/observer.go @@ -165,7 +165,3 @@ func (co *contextObserver) Write(ent zapcore.Entry, fields []zapcore.Field) erro func (co *contextObserver) Sync() error { return nil } - -func (co *contextObserver) Close() error { - return nil -} From bfd7a9412d4b96e71b3c802143250c101ceb6d73 Mon Sep 17 00:00:00 2001 From: liqi Date: Fri, 29 May 2020 16:08:24 +0800 Subject: [PATCH 11/54] rename cancel to close and keep syncing --- zapcore/write_syncer.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index fb2a96df9..85dd5fe7c 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -89,16 +89,16 @@ const defaultBufferSize = 256 * 1024 // defaultFlushInterval means the default flush interval const defaultFlushInterval = 30 * time.Second -// CancelFunc should be called when the caller exits to clean up buffers. -type CancelFunc func() error +// CloseFunc should be called when the caller exits to clean up buffers. +type CloseFunc func() error // Buffer wraps a WriteSyncer in a buffer to improve performance, // if bufferSize = 0, we set it to defaultBufferSize // if flushInterval = 0, we set it to defaultFlushInterval -func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteSyncer, CancelFunc) { +func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteSyncer, CloseFunc) { ctx, cancel := context.WithCancel(context.Background()) - cancelfunc := func() error { + closefunc := func() error { cancel() return ws.Sync() } @@ -106,7 +106,7 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS if lws, ok := ws.(*lockedWriteSyncer); ok { if _, ok := lws.ws.(*bufferWriterSyncer); ok { // no need to layer on another buffer - return ws, cancelfunc + return ws, closefunc } } @@ -128,15 +128,15 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS go func() { select { case <-time.NewTicker(flushInterval).C: - if err := ws.Sync(); err != nil { - return - } + // the background goroutine just keep syncing + // until the close func is called. + _ = ws.Sync() case <-ctx.Done(): return } }() - return ws, cancelfunc + return ws, closefunc } func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { From 66b7878df1cde1177248a3052395d246969abfad Mon Sep 17 00:00:00 2001 From: liqi Date: Mon, 1 Jun 2020 09:30:48 +0800 Subject: [PATCH 12/54] fix lint --- zapcore/write_syncer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 85dd5fe7c..fbe64c0c4 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -79,7 +79,6 @@ func (s *lockedWriteSyncer) Sync() error { } type bufferWriterSyncer struct { - ws WriteSyncer bufferWriter *bufio.Writer } From 5e98dbb858b49880eba8771d3f7a1a102388a2d4 Mon Sep 17 00:00:00 2001 From: liqi Date: Mon, 1 Jun 2020 10:44:30 +0800 Subject: [PATCH 13/54] 100% test coverage --- zapcore/write_syncer.go | 6 +++++- zapcore/write_syncer_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index fbe64c0c4..b896d0359 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -80,6 +80,7 @@ func (s *lockedWriteSyncer) Sync() error { type bufferWriterSyncer struct { bufferWriter *bufio.Writer + ticker *time.Ticker } // defaultBufferSize sizes the buffer associated with each WriterSync. @@ -117,16 +118,19 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS flushInterval = defaultFlushInterval } + ticker := time.NewTicker(flushInterval) + // bufio is not goroutine safe, so add lock writer here ws = Lock(&bufferWriterSyncer{ bufferWriter: bufio.NewWriterSize(ws, bufferSize), + ticker: ticker, }) // flush buffer every interval // we do not need exit this goroutine explicitly go func() { select { - case <-time.NewTicker(flushInterval).C: + case <-ticker.C: // the background goroutine just keep syncing // until the close func is called. _ = ws.Sync() diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 0fdf85e8a..832acee63 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -24,6 +24,7 @@ import ( "bytes" "errors" "testing" + "time" "io" @@ -37,6 +38,10 @@ type writeSyncSpy struct { ztest.Syncer } +type errorWriter struct{} + +func (*errorWriter) Write([]byte) (int, error) { return 0, errors.New("unimplemented") } + func requireWriteWorks(t testing.TB, ws WriteSyncer) { n, err := ws.Write([]byte("foo")) require.NoError(t, err, "Unexpected error writing to WriteSyncer.") @@ -110,6 +115,25 @@ func TestBufferWriter(t *testing.T) { assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) + t.Run("flush error", func(t *testing.T) { + ws, cancel := Buffer(AddSync(&errorWriter{}), 4, time.Nanosecond) + n, err := ws.Write([]byte("foo")) + require.NoError(t, err, "Unexpected error writing to WriteSyncer.") + require.Equal(t, 3, n, "Wrote an unexpected number of bytes.") + ws.Write([]byte("foo")) + assert.NotNil(t, cancel()) + }) + + t.Run("flush timer", func(t *testing.T) { + buf := &bytes.Buffer{} + ws, _ := Buffer(AddSync(buf), 6, time.Microsecond) + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + ticker := ws.(*lockedWriteSyncer).ws.(*bufferWriterSyncer).ticker + <-ticker.C + <-ticker.C + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) } func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) { From 5980ec9a831674b3394a860d409d0bff437059c5 Mon Sep 17 00:00:00 2001 From: liqi Date: Mon, 1 Jun 2020 10:46:28 +0800 Subject: [PATCH 14/54] improve comment --- zapcore/write_syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index b896d0359..fc10505e5 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -127,7 +127,7 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS }) // flush buffer every interval - // we do not need exit this goroutine explicitly + // we only exit this goroutine after closefunc called explicitly go func() { select { case <-ticker.C: From 7f5913c4a9dde9bcfcb730843fd1c031528e8268 Mon Sep 17 00:00:00 2001 From: liqi Date: Mon, 1 Jun 2020 10:47:10 +0800 Subject: [PATCH 15/54] improve comment --- zapcore/write_syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index fc10505e5..007906c0d 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -127,7 +127,7 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS }) // flush buffer every interval - // we only exit this goroutine after closefunc called explicitly + // we do not need to exit this goroutine until closefunc called explicitly go func() { select { case <-ticker.C: From 07b0e3a18b5893acd4032196409f99d2014871c3 Mon Sep 17 00:00:00 2001 From: liqi Date: Mon, 1 Jun 2020 11:09:39 +0800 Subject: [PATCH 16/54] fix test error --- zapcore/write_syncer_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 832acee63..050c5715a 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -129,9 +129,7 @@ func TestBufferWriter(t *testing.T) { ws, _ := Buffer(AddSync(buf), 6, time.Microsecond) requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - ticker := ws.(*lockedWriteSyncer).ws.(*bufferWriterSyncer).ticker - <-ticker.C - <-ticker.C + ztest.Sleep(10 * time.Millisecond) assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) } From dfdb0c2601544f6a548e888741774d613fc476cb Mon Sep 17 00:00:00 2001 From: liqi Date: Mon, 1 Jun 2020 11:42:45 +0800 Subject: [PATCH 17/54] fix race condition in test case --- zapcore/write_syncer.go | 14 +++++++++++--- zapcore/write_syncer_test.go | 6 +++++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 007906c0d..6598e4927 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -79,6 +79,7 @@ func (s *lockedWriteSyncer) Sync() error { } type bufferWriterSyncer struct { + sync.Mutex bufferWriter *bufio.Writer ticker *time.Ticker } @@ -120,11 +121,10 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS ticker := time.NewTicker(flushInterval) - // bufio is not goroutine safe, so add lock writer here - ws = Lock(&bufferWriterSyncer{ + ws = &bufferWriterSyncer{ bufferWriter: bufio.NewWriterSize(ws, bufferSize), ticker: ticker, - }) + } // flush buffer every interval // we do not need to exit this goroutine until closefunc called explicitly @@ -143,6 +143,10 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS } func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { + // bufio is not goroutine safe, so add lock writer here + s.Lock() + defer s.Unlock() + // there are some logic internal for bufio.Writer here: // 1. when the buffer is enough, data would not be flushed. // 2. when the buffer is not enough, data would be flushed as soon as the buffer fills up. @@ -159,6 +163,10 @@ func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { } func (s *bufferWriterSyncer) Sync() error { + // bufio is not goroutine safe, so add lock writer here + s.Lock() + defer s.Unlock() + return s.bufferWriter.Flush() } diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 050c5715a..7c0ef41dc 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -112,6 +112,8 @@ func TestBufferWriter(t *testing.T) { requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + requireWriteWorks(t, ws) assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) @@ -128,8 +130,10 @@ func TestBufferWriter(t *testing.T) { buf := &bytes.Buffer{} ws, _ := Buffer(AddSync(buf), 6, time.Microsecond) requireWriteWorks(t, ws) - assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") ztest.Sleep(10 * time.Millisecond) + bws := ws.(*bufferWriterSyncer) + bws.Lock() + defer bws.Unlock() assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) } From c8b997915418e9c82b8ee2379158b91d4c2d0a13 Mon Sep 17 00:00:00 2001 From: liqi Date: Mon, 1 Jun 2020 11:56:01 +0800 Subject: [PATCH 18/54] 100% test coverage --- zapcore/write_syncer.go | 8 +++----- zapcore/write_syncer_test.go | 2 -- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 6598e4927..033b3707d 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -104,11 +104,9 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS return ws.Sync() } - if lws, ok := ws.(*lockedWriteSyncer); ok { - if _, ok := lws.ws.(*bufferWriterSyncer); ok { - // no need to layer on another buffer - return ws, closefunc - } + if _, ok := ws.(*bufferWriterSyncer); ok { + // no need to layer on another buffer + return ws, closefunc } if bufferSize == 0 { diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 7c0ef41dc..1bdbc1283 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -112,8 +112,6 @@ func TestBufferWriter(t *testing.T) { requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") requireWriteWorks(t, ws) - assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - requireWriteWorks(t, ws) assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) From 6d9018f32552ba2686fc66d11311d6a3d466b172 Mon Sep 17 00:00:00 2001 From: liqi Date: Fri, 5 Jun 2020 08:47:02 +0800 Subject: [PATCH 19/54] add loop and fix typo --- zapcore/write_syncer.go | 16 ++++++++------ zapcore/write_syncer_bench_test.go | 8 +++---- zapcore/write_syncer_test.go | 35 +++++++++++++++--------------- 3 files changed, 31 insertions(+), 28 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 033b3707d..738187afa 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -127,13 +127,15 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS // flush buffer every interval // we do not need to exit this goroutine until closefunc called explicitly go func() { - select { - case <-ticker.C: - // the background goroutine just keep syncing - // until the close func is called. - _ = ws.Sync() - case <-ctx.Done(): - return + for { + select { + case <-ticker.C: + // the background goroutine just keep syncing + // until the close func is called. + _ = ws.Sync() + case <-ctx.Done(): + return + } } }() diff --git a/zapcore/write_syncer_bench_test.go b/zapcore/write_syncer_bench_test.go index dc5d40ea1..2f25a0844 100644 --- a/zapcore/write_syncer_bench_test.go +++ b/zapcore/write_syncer_bench_test.go @@ -57,13 +57,13 @@ func BenchmarkMultiWriteSyncer(b *testing.B) { }) }) b.Run("4 discarder with buffer", func(b *testing.B) { - w, cancel := Buffer(NewMultiWriteSyncer( + w, close := Buffer(NewMultiWriteSyncer( &ztest.Discarder{}, &ztest.Discarder{}, &ztest.Discarder{}, &ztest.Discarder{}, ), 0, 0) - defer cancel() + defer close() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -94,8 +94,8 @@ func BenchmarkWriteSyncer(b *testing.B) { defer file.Close() defer os.Remove(file.Name()) - w, cancel := Buffer(AddSync(file), 0, 0) - defer cancel() + w, close := Buffer(AddSync(file), 0, 0) + defer close() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 1bdbc1283..68266765c 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -75,40 +75,40 @@ func TestBufferWriter(t *testing.T) { // with a no-op Sync. t.Run("sync", func(t *testing.T) { buf := &bytes.Buffer{} - ws, cancel := Buffer(AddSync(buf), 0, 0) - defer cancel() + ws, close := Buffer(AddSync(buf), 0, 0) + defer close() requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) - t.Run("1 cancel", func(t *testing.T) { + t.Run("1 close", func(t *testing.T) { buf := &bytes.Buffer{} - ws, cancel := Buffer(AddSync(buf), 0, 0) + ws, close := Buffer(AddSync(buf), 0, 0) requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - cancel() + close() assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) - t.Run("2 cancel", func(t *testing.T) { + t.Run("2 close", func(t *testing.T) { buf := &bytes.Buffer{} - bufsync, cancel1 := Buffer(AddSync(buf), 0, 0) - ws, cancel2 := Buffer(bufsync, 0, 0) + bufsync, close1 := Buffer(AddSync(buf), 0, 0) + ws, close2 := Buffer(bufsync, 0, 0) requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - cancel2() - cancel1() + close2() + close1() assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) t.Run("small buffer", func(t *testing.T) { buf := &bytes.Buffer{} - bufsync, cancel1 := Buffer(AddSync(buf), 5, 0) - ws, cancel2 := Buffer(bufsync, 5, 0) - defer cancel1() - defer cancel2() + bufsync, close1 := Buffer(AddSync(buf), 5, 0) + ws, close2 := Buffer(bufsync, 5, 0) + defer close1() + defer close2() requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") requireWriteWorks(t, ws) @@ -116,17 +116,18 @@ func TestBufferWriter(t *testing.T) { }) t.Run("flush error", func(t *testing.T) { - ws, cancel := Buffer(AddSync(&errorWriter{}), 4, time.Nanosecond) + ws, close := Buffer(AddSync(&errorWriter{}), 4, time.Nanosecond) n, err := ws.Write([]byte("foo")) require.NoError(t, err, "Unexpected error writing to WriteSyncer.") require.Equal(t, 3, n, "Wrote an unexpected number of bytes.") ws.Write([]byte("foo")) - assert.NotNil(t, cancel()) + assert.NotNil(t, close()) }) t.Run("flush timer", func(t *testing.T) { buf := &bytes.Buffer{} - ws, _ := Buffer(AddSync(buf), 6, time.Microsecond) + ws, close := Buffer(AddSync(buf), 6, time.Microsecond) + defer close() requireWriteWorks(t, ws) ztest.Sleep(10 * time.Millisecond) bws := ws.(*bufferWriterSyncer) From f61fe13351c9aabaa7a0a3d30b8e3a4d098297db Mon Sep 17 00:00:00 2001 From: Sophos Date: Tue, 9 Jun 2020 09:45:37 +0800 Subject: [PATCH 20/54] Update zapcore/write_syncer.go Co-authored-by: Prashant Varanasi --- zapcore/write_syncer.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 738187afa..386efe1c4 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -153,8 +153,7 @@ func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { // this would lead to log spliting, which is not acceptable for log collector // so we need to flush bufferWriter before writing the data into bufferWriter if len(bs) > s.bufferWriter.Available() && s.bufferWriter.Buffered() > 0 { - err := s.bufferWriter.Flush() - if err != nil { + if err := s.bufferWriter.Flush(); err != nil { return 0, err } } From 7954bcf51f303141c7aa5012d48b02eaea97954b Mon Sep 17 00:00:00 2001 From: liqi Date: Tue, 9 Jun 2020 09:47:15 +0800 Subject: [PATCH 21/54] improve test --- zapcore/write_syncer_bench_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/zapcore/write_syncer_bench_test.go b/zapcore/write_syncer_bench_test.go index 2f25a0844..f16fc3530 100644 --- a/zapcore/write_syncer_bench_test.go +++ b/zapcore/write_syncer_bench_test.go @@ -75,8 +75,8 @@ func BenchmarkMultiWriteSyncer(b *testing.B) { func BenchmarkWriteSyncer(b *testing.B) { b.Run("write file with no buffer", func(b *testing.B) { - file, err := ioutil.TempFile(".", "*") - assert.Nil(b, err) + file, err := ioutil.TempFile("", "log") + assert.NoError(b, err) defer file.Close() defer os.Remove(file.Name()) @@ -89,8 +89,8 @@ func BenchmarkWriteSyncer(b *testing.B) { }) }) b.Run("write file with buffer", func(b *testing.B) { - file, err := ioutil.TempFile(".", "*") - assert.Nil(b, err) + file, err := ioutil.TempFile("", "log") + assert.NoError(b, err) defer file.Close() defer os.Remove(file.Name()) From 6e6e6f6c2fb4219c05072ea4bd69f5ccd3d11c1a Mon Sep 17 00:00:00 2001 From: liqi Date: Tue, 9 Jun 2020 09:54:56 +0800 Subject: [PATCH 22/54] validate loop logic in test case --- zapcore/write_syncer_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 68266765c..a6fe477d4 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -132,8 +132,15 @@ func TestBufferWriter(t *testing.T) { ztest.Sleep(10 * time.Millisecond) bws := ws.(*bufferWriterSyncer) bws.Lock() - defer bws.Unlock() assert.Equal(t, "foo", buf.String(), "Unexpected log string") + bws.Unlock() + + // flush twice to validate loop logic + requireWriteWorks(t, ws) + ztest.Sleep(10 * time.Millisecond) + bws.Lock() + assert.Equal(t, "foofoo", buf.String(), "Unexpected log string") + bws.Unlock() }) } From 1e7da7f742f7bf19a04313ee56775a2185f4888f Mon Sep 17 00:00:00 2001 From: liqi Date: Tue, 9 Jun 2020 10:00:15 +0800 Subject: [PATCH 23/54] group default config --- zapcore/write_syncer.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 386efe1c4..45a1d9395 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -84,11 +84,13 @@ type bufferWriterSyncer struct { ticker *time.Ticker } -// defaultBufferSize sizes the buffer associated with each WriterSync. -const defaultBufferSize = 256 * 1024 +const ( + // defaultBufferSize sizes the buffer associated with each WriterSync. + defaultBufferSize = 256 * 1024 -// defaultFlushInterval means the default flush interval -const defaultFlushInterval = 30 * time.Second + // defaultFlushInterval means the default flush interval + defaultFlushInterval = 30 * time.Second +) // CloseFunc should be called when the caller exits to clean up buffers. type CloseFunc func() error From c427af244c97ee5d00cddd624c9afc26920144f7 Mon Sep 17 00:00:00 2001 From: liqi Date: Tue, 23 Jun 2020 10:25:36 +0800 Subject: [PATCH 24/54] improve close logic --- zapcore/write_syncer.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 45a1d9395..a10bd47f0 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -99,18 +99,13 @@ type CloseFunc func() error // if bufferSize = 0, we set it to defaultBufferSize // if flushInterval = 0, we set it to defaultFlushInterval func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteSyncer, CloseFunc) { - ctx, cancel := context.WithCancel(context.Background()) - - closefunc := func() error { - cancel() - return ws.Sync() - } - if _, ok := ws.(*bufferWriterSyncer); ok { // no need to layer on another buffer - return ws, closefunc + return ws, func() error { return nil } } + ctx, cancel := context.WithCancel(context.Background()) + if bufferSize == 0 { bufferSize = defaultBufferSize } @@ -141,6 +136,11 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS } }() + closefunc := func() error { + cancel() + return ws.Sync() + } + return ws, closefunc } From 1d00db7310b287dc9462277ea7f68f28d1db3bac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 9 Oct 2020 19:21:51 +0800 Subject: [PATCH 25/54] Update zapcore/write_syncer.go Co-authored-by: Abhinav Gupta --- zapcore/write_syncer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index a10bd47f0..d90bb76ea 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -80,6 +80,7 @@ func (s *lockedWriteSyncer) Sync() error { type bufferWriterSyncer struct { sync.Mutex + bufferWriter *bufio.Writer ticker *time.Ticker } From a17596e3a043d3b48abb895487877a2fe47a6d7d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 9 Oct 2020 19:23:00 +0800 Subject: [PATCH 26/54] Update zapcore/write_syncer_test.go Co-authored-by: Abhinav Gupta --- zapcore/write_syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index a6fe477d4..b6684c1cc 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -78,7 +78,7 @@ func TestBufferWriter(t *testing.T) { ws, close := Buffer(AddSync(buf), 0, 0) defer close() requireWriteWorks(t, ws) - assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.") assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) From ccd3608cf28b2b570560d42879e169a46a4b7320 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 9 Oct 2020 19:23:22 +0800 Subject: [PATCH 27/54] Update zapcore/write_syncer_test.go Co-authored-by: Abhinav Gupta --- zapcore/write_syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index b6684c1cc..f0fb01720 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -87,7 +87,7 @@ func TestBufferWriter(t *testing.T) { buf := &bytes.Buffer{} ws, close := Buffer(AddSync(buf), 0, 0) requireWriteWorks(t, ws) - assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.") close() assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) From 13cdf32bc69155845d0e530704aba5f349a563e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 9 Oct 2020 19:23:34 +0800 Subject: [PATCH 28/54] Update zapcore/write_syncer_test.go Co-authored-by: Abhinav Gupta --- zapcore/write_syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index f0fb01720..b741a32de 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -83,7 +83,7 @@ func TestBufferWriter(t *testing.T) { assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) - t.Run("1 close", func(t *testing.T) { + t.Run("close", func(t *testing.T) { buf := &bytes.Buffer{} ws, close := Buffer(AddSync(buf), 0, 0) requireWriteWorks(t, ws) From a2a235bfbdc63004c999d74b4bcde3d297cd1d84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 9 Oct 2020 19:23:47 +0800 Subject: [PATCH 29/54] Update zapcore/write_syncer_test.go Co-authored-by: Abhinav Gupta --- zapcore/write_syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index b741a32de..a1b3206bf 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -92,7 +92,7 @@ func TestBufferWriter(t *testing.T) { assert.Equal(t, "foo", buf.String(), "Unexpected log string") }) - t.Run("2 close", func(t *testing.T) { + t.Run("wrap twice", func(t *testing.T) { buf := &bytes.Buffer{} bufsync, close1 := Buffer(AddSync(buf), 0, 0) ws, close2 := Buffer(bufsync, 0, 0) From 0477c9daa68bcbdae90558739e331c16b6e9a986 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 9 Oct 2020 19:24:14 +0800 Subject: [PATCH 30/54] Update zapcore/write_syncer_test.go Co-authored-by: Abhinav Gupta --- zapcore/write_syncer_test.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index a1b3206bf..6d79f6d7b 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -105,10 +105,8 @@ func TestBufferWriter(t *testing.T) { t.Run("small buffer", func(t *testing.T) { buf := &bytes.Buffer{} - bufsync, close1 := Buffer(AddSync(buf), 5, 0) - ws, close2 := Buffer(bufsync, 5, 0) - defer close1() - defer close2() + ws, close := Buffer(AddSync(buf), 5, 0) + defer close() requireWriteWorks(t, ws) assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") requireWriteWorks(t, ws) From 16a21477ac1aa6bd7823c52ef3602539228191de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 9 Oct 2020 19:24:24 +0800 Subject: [PATCH 31/54] Update zapcore/write_syncer_test.go Co-authored-by: Abhinav Gupta --- zapcore/write_syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 6d79f6d7b..f09b23627 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -119,7 +119,7 @@ func TestBufferWriter(t *testing.T) { require.NoError(t, err, "Unexpected error writing to WriteSyncer.") require.Equal(t, 3, n, "Wrote an unexpected number of bytes.") ws.Write([]byte("foo")) - assert.NotNil(t, close()) + assert.Error(t, close(), "Expected close to fail.") }) t.Run("flush timer", func(t *testing.T) { From 761c8e8fc736367a5c9ea53cf3390ec163721641 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Tue, 13 Oct 2020 09:43:34 +0800 Subject: [PATCH 32/54] improve close logic and use goleak --- go.mod | 1 + go.sum | 4 ++++ zapcore/write_syncer.go | 23 +++++++++++++---------- zapcore/write_syncer_test.go | 3 +++ 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index 6ef4db70e..66bfc742a 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.4.0 go.uber.org/atomic v1.6.0 + go.uber.org/goleak v1.1.10 go.uber.org/multierr v1.5.0 golang.org/x/lint v0.0.0-20190930215403-16217165b5de gopkg.in/yaml.v2 v2.2.2 diff --git a/go.sum b/go.sum index 99cdb93ea..5a3bbea55 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= @@ -44,6 +46,8 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/N golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index d90bb76ea..747154a88 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -22,7 +22,6 @@ package zapcore import ( "bufio" - "context" "io" "sync" "time" @@ -80,6 +79,7 @@ func (s *lockedWriteSyncer) Sync() error { type bufferWriterSyncer struct { sync.Mutex + stop chan struct{} bufferWriter *bufio.Writer ticker *time.Ticker @@ -96,7 +96,7 @@ const ( // CloseFunc should be called when the caller exits to clean up buffers. type CloseFunc func() error -// Buffer wraps a WriteSyncer in a buffer to improve performance, +// Buffer wraps a WriteSyncer in a buffer to improve performance // if bufferSize = 0, we set it to defaultBufferSize // if flushInterval = 0, we set it to defaultFlushInterval func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteSyncer, CloseFunc) { @@ -105,8 +105,6 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS return ws, func() error { return nil } } - ctx, cancel := context.WithCancel(context.Background()) - if bufferSize == 0 { bufferSize = defaultBufferSize } @@ -117,7 +115,8 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS ticker := time.NewTicker(flushInterval) - ws = &bufferWriterSyncer{ + bws := &bufferWriterSyncer{ + stop: make(chan struct{}), bufferWriter: bufio.NewWriterSize(ws, bufferSize), ticker: ticker, } @@ -130,21 +129,24 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS case <-ticker.C: // the background goroutine just keep syncing // until the close func is called. - _ = ws.Sync() - case <-ctx.Done(): + _ = bws.Sync() + case <-bws.stop: return } } }() closefunc := func() error { - cancel() - return ws.Sync() + bws.stop <- struct{}{} + + return bws.Sync() } - return ws, closefunc + return bws, closefunc } +// Write writes log data into buffer syncer directly, multiple Write calls will be batched, +// and log data will be flushed to disk when the buffer is full or periodically. func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { // bufio is not goroutine safe, so add lock writer here s.Lock() @@ -164,6 +166,7 @@ func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { return s.bufferWriter.Write(bs) } +// Sync flushes buffered log data into disk directly. func (s *bufferWriterSyncer) Sync() error { // bufio is not goroutine safe, so add lock writer here s.Lock() diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index f09b23627..f41703943 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -30,6 +30,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/goleak" "go.uber.org/zap/internal/ztest" ) @@ -71,6 +72,8 @@ func TestAddSyncWriter(t *testing.T) { } func TestBufferWriter(t *testing.T) { + goleak.VerifyNone(t) + // If we pass a plain io.Writer, make sure that we still get a WriteSyncer // with a no-op Sync. t.Run("sync", func(t *testing.T) { From a2123c9e114773f9064e30eab2c66d9af1f31f4d Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 2 Nov 2020 10:10:53 -0800 Subject: [PATCH 33/54] Drop `type CloseFunc` We'd like to avoid introducing a new top-level type if we can. Delete the CloseFunc type in favor of returning a naked `func() error`. --- zapcore/write_syncer.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 747154a88..173c8178d 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -93,13 +93,11 @@ const ( defaultFlushInterval = 30 * time.Second ) -// CloseFunc should be called when the caller exits to clean up buffers. -type CloseFunc func() error // Buffer wraps a WriteSyncer in a buffer to improve performance // if bufferSize = 0, we set it to defaultBufferSize // if flushInterval = 0, we set it to defaultFlushInterval -func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteSyncer, CloseFunc) { +func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ WriteSyncer, close func() error) { if _, ok := ws.(*bufferWriterSyncer); ok { // no need to layer on another buffer return ws, func() error { return nil } @@ -136,13 +134,10 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (WriteS } }() - closefunc := func() error { + return bws, func() error { bws.stop <- struct{}{} - return bws.Sync() } - - return bws, closefunc } // Write writes log data into buffer syncer directly, multiple Write calls will be batched, From 1957050b7ca7d28353cc9f1d95ec64330e98a7be Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 2 Nov 2020 10:11:39 -0800 Subject: [PATCH 34/54] doc: Rewrite Buffer documentation --- zapcore/write_syncer.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 173c8178d..d1cf53956 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -93,10 +93,22 @@ const ( defaultFlushInterval = 30 * time.Second ) - -// Buffer wraps a WriteSyncer in a buffer to improve performance -// if bufferSize = 0, we set it to defaultBufferSize -// if flushInterval = 0, we set it to defaultFlushInterval +// Buffer wraps a WriteSyncer to buffer its output. The returned WriteSyncer +// flushes its output as the buffer fills up, or at the provided interval, +// whichever comes first. +// +// Call the returned function to finish using the WriteSyncer and flush +// remaining bytes. +// +// func main() { +// // ... +// ws, closeWS := zapcore.Buffer(ws, 0, 0) +// defer closeWS() +// // ... +// } +// +// The buffer size defaults to 256 kB if set to zero. +// The flush interval defaults to 30 seconds if set to zero. func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ WriteSyncer, close func() error) { if _, ok := ws.(*bufferWriterSyncer); ok { // no need to layer on another buffer From 371117f2f796ff45cd745303c1029f073f91901e Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 2 Nov 2020 10:11:51 -0800 Subject: [PATCH 35/54] Prefix default{BufferSize, FlushInterval} with _ Per https://github.com/uber-go/guide/blob/master/style.md#prefix-unexported-globals-with-_ --- zapcore/write_syncer.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index d1cf53956..9b252552b 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -86,11 +86,11 @@ type bufferWriterSyncer struct { } const ( - // defaultBufferSize sizes the buffer associated with each WriterSync. - defaultBufferSize = 256 * 1024 + // _defaultBufferSize sizes the buffer associated with each WriterSync. + _defaultBufferSize = 256 * 1024 // 256 kB - // defaultFlushInterval means the default flush interval - defaultFlushInterval = 30 * time.Second + // _defaultFlushInterval means the default flush interval + _defaultFlushInterval = 30 * time.Second ) // Buffer wraps a WriteSyncer to buffer its output. The returned WriteSyncer @@ -116,11 +116,11 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ Writ } if bufferSize == 0 { - bufferSize = defaultBufferSize + bufferSize = _defaultBufferSize } if flushInterval == 0 { - flushInterval = defaultFlushInterval + flushInterval = _defaultFlushInterval } ticker := time.NewTicker(flushInterval) From 22a242742e2ba2a630718b9d6853076e447b029c Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 2 Nov 2020 10:14:23 -0800 Subject: [PATCH 36/54] Buffer/close: return a bound method Instead of returning an anonmyous function, return a bound method. The bound method is named `close`, not `Close` to avoid accidentally exposing an `io.Closer` interface. --- zapcore/write_syncer.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 9b252552b..899bd33b0 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -146,10 +146,7 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ Writ } }() - return bws, func() error { - bws.stop <- struct{}{} - return bws.Sync() - } + return bws, bws.close } // Write writes log data into buffer syncer directly, multiple Write calls will be batched, @@ -182,6 +179,13 @@ func (s *bufferWriterSyncer) Sync() error { return s.bufferWriter.Flush() } +// Close closes the buffer, cleans up background goroutines, and flushes +// remaining, unwritten data. +func (s *bufferWriterSyncer) close() error { + s.stop <- struct{}{} + return s.Sync() +} + type writerWrapper struct { io.Writer } From c0f68a91412075ac0b693aa0dd678299499a35ec Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 2 Nov 2020 10:15:36 -0800 Subject: [PATCH 37/54] Buffer/close: Close the channel instead of posting For the channel that signals end of the goroutine, we should close it instead of posting a single value to it. --- zapcore/write_syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 899bd33b0..866e7857f 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -182,7 +182,7 @@ func (s *bufferWriterSyncer) Sync() error { // Close closes the buffer, cleans up background goroutines, and flushes // remaining, unwritten data. func (s *bufferWriterSyncer) close() error { - s.stop <- struct{}{} + close(s.stop) return s.Sync() } From f57c09b05e5c213c1f2fa4aee4135781c06e0145 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 2 Nov 2020 10:19:17 -0800 Subject: [PATCH 38/54] _default{BufferSize, FlushInterval}: docs --- zapcore/write_syncer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 866e7857f..30f875d99 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -86,10 +86,11 @@ type bufferWriterSyncer struct { } const ( - // _defaultBufferSize sizes the buffer associated with each WriterSync. + // _defaultBufferSize specifies the default size used by Buffer. _defaultBufferSize = 256 * 1024 // 256 kB - // _defaultFlushInterval means the default flush interval + // _defaultFlushInterval specifies the default flush interval for + // Buffer. _defaultFlushInterval = 30 * time.Second ) From 70795cf4f5186540997d526c3270052ea7a79c28 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 2 Nov 2020 10:22:25 -0800 Subject: [PATCH 39/54] buffer: Move loop into a method Move the flush loop into its own method instead of spawning it in a closure. --- zapcore/write_syncer.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 30f875d99..5d69cc70b 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -132,20 +132,7 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ Writ ticker: ticker, } - // flush buffer every interval - // we do not need to exit this goroutine until closefunc called explicitly - go func() { - for { - select { - case <-ticker.C: - // the background goroutine just keep syncing - // until the close func is called. - _ = bws.Sync() - case <-bws.stop: - return - } - } - }() + go bws.flushLoop() return bws, bws.close } @@ -180,7 +167,20 @@ func (s *bufferWriterSyncer) Sync() error { return s.bufferWriter.Flush() } -// Close closes the buffer, cleans up background goroutines, and flushes +// flushLoop flushes the buffer at the configured interval until Close is +// called. +func (s *bufferWriterSyncer) flushLoop() { + for { + select { + case <-s.ticker.C: + _ = s.Sync() + case <-s.stop: + return + } + } +} + +// close closes the buffer, cleans up background goroutines, and flushes // remaining, unwritten data. func (s *bufferWriterSyncer) close() error { close(s.stop) From d61c899430cfec9f8f39e93d6e002ec1283678c6 Mon Sep 17 00:00:00 2001 From: Abhinav Gupta Date: Mon, 2 Nov 2020 10:23:14 -0800 Subject: [PATCH 40/54] buffer/close: stop the ticker On close, stop the ticker from posting ticks to the channel. --- zapcore/write_syncer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 5d69cc70b..12f230fc8 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -183,6 +183,7 @@ func (s *bufferWriterSyncer) flushLoop() { // close closes the buffer, cleans up background goroutines, and flushes // remaining, unwritten data. func (s *bufferWriterSyncer) close() error { + s.ticker.Stop() close(s.stop) return s.Sync() } From 20fc7a8a9be78c1bd996e9ebd1a724fe4cc49f6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 6 Nov 2020 15:03:48 +0800 Subject: [PATCH 41/54] Update zapcore/write_syncer_test.go Co-authored-by: Prashant Varanasi --- zapcore/write_syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index f41703943..0f8fd9129 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -72,7 +72,7 @@ func TestAddSyncWriter(t *testing.T) { } func TestBufferWriter(t *testing.T) { - goleak.VerifyNone(t) + defer goleak.VerifyNone(t) // If we pass a plain io.Writer, make sure that we still get a WriteSyncer // with a no-op Sync. From 319e6fefbe2f3848fb6a27e51f03f8c820fdfdce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 6 Nov 2020 15:15:36 +0800 Subject: [PATCH 42/54] Update zapcore/write_syncer.go Co-authored-by: Prashant Varanasi --- zapcore/write_syncer.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 12f230fc8..49cd80018 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -124,12 +124,10 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ Writ flushInterval = _defaultFlushInterval } - ticker := time.NewTicker(flushInterval) - bws := &bufferWriterSyncer{ stop: make(chan struct{}), bufferWriter: bufio.NewWriterSize(ws, bufferSize), - ticker: ticker, + ticker: time.NewTicker(flushInterval), } go bws.flushLoop() From d29379614dfde5d0a9882b99164dca5597df494c Mon Sep 17 00:00:00 2001 From: liqi Date: Fri, 6 Nov 2020 15:16:07 +0800 Subject: [PATCH 43/54] improve code style --- zapcore/write_syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 12f230fc8..9bbe73e2b 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -79,8 +79,8 @@ func (s *lockedWriteSyncer) Sync() error { type bufferWriterSyncer struct { sync.Mutex - stop chan struct{} + stop chan struct{} bufferWriter *bufio.Writer ticker *time.Ticker } From 9613897c1cb587c5665e36ec244fb0ae049b4699 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 6 Nov 2020 15:16:52 +0800 Subject: [PATCH 44/54] Update zapcore/write_syncer.go Co-authored-by: Prashant Varanasi --- zapcore/write_syncer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index cad561ecb..9145b8451 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -138,7 +138,6 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ Writ // Write writes log data into buffer syncer directly, multiple Write calls will be batched, // and log data will be flushed to disk when the buffer is full or periodically. func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { - // bufio is not goroutine safe, so add lock writer here s.Lock() defer s.Unlock() From 5160827d4a25cf48a819fe27148b32a399597dc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 6 Nov 2020 15:17:11 +0800 Subject: [PATCH 45/54] Update zapcore/write_syncer.go Co-authored-by: Prashant Varanasi --- zapcore/write_syncer.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 9145b8451..d0c023803 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -141,11 +141,9 @@ func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { s.Lock() defer s.Unlock() - // there are some logic internal for bufio.Writer here: - // 1. when the buffer is enough, data would not be flushed. - // 2. when the buffer is not enough, data would be flushed as soon as the buffer fills up. - // this would lead to log spliting, which is not acceptable for log collector - // so we need to flush bufferWriter before writing the data into bufferWriter + // To avoid partial writes from being flushed, we manually flush the existing buffer if: + // * The current write doesn't fit into the buffer fully, and + // * The buffer is not empty (since bufio will not split large writes when the buffer is empty) if len(bs) > s.bufferWriter.Available() && s.bufferWriter.Buffered() > 0 { if err := s.bufferWriter.Flush(); err != nil { return 0, err From 37a503ba20557540f85e83c3b7408565848310ff Mon Sep 17 00:00:00 2001 From: liqi Date: Fri, 6 Nov 2020 15:23:08 +0800 Subject: [PATCH 46/54] add comment --- zapcore/write_syncer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index d0c023803..56806a70c 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -168,6 +168,9 @@ func (s *bufferWriterSyncer) flushLoop() { for { select { case <-s.ticker.C: + // we just simply ignore error here + // because the underlying bufio writer stores any errors + // and we return any error from Sync() as part of the close _ = s.Sync() case <-s.stop: return From bea28632a81eb19d97459ab73acc8eb5e41fdc5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 6 Nov 2020 15:25:35 +0800 Subject: [PATCH 47/54] Update zapcore/write_syncer.go Co-authored-by: Prashant Varanasi --- zapcore/write_syncer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 56806a70c..dcb02472e 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -155,7 +155,6 @@ func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { // Sync flushes buffered log data into disk directly. func (s *bufferWriterSyncer) Sync() error { - // bufio is not goroutine safe, so add lock writer here s.Lock() defer s.Unlock() From c971509bf2a263f0c4a346579bc6d7e765e2f4e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A4=A7=E5=8F=AF?= Date: Fri, 6 Nov 2020 15:31:53 +0800 Subject: [PATCH 48/54] Update zapcore/write_syncer.go Co-authored-by: Abhinav Gupta --- zapcore/write_syncer.go | 1 + 1 file changed, 1 insertion(+) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index dcb02472e..84816a0c2 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -80,6 +80,7 @@ func (s *lockedWriteSyncer) Sync() error { type bufferWriterSyncer struct { sync.Mutex + stop chan struct{} bufferWriter *bufio.Writer ticker *time.Ticker From 8c2cdfe869dad4abd9f6935f640e83a116d6b08b Mon Sep 17 00:00:00 2001 From: liqi Date: Fri, 6 Nov 2020 15:32:34 +0800 Subject: [PATCH 49/54] update --- zapcore/write_syncer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 84816a0c2..dcb02472e 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -80,7 +80,6 @@ func (s *lockedWriteSyncer) Sync() error { type bufferWriterSyncer struct { sync.Mutex - stop chan struct{} bufferWriter *bufio.Writer ticker *time.Ticker From 1fe111763d44155aea1d36bdc26484b51e9edffe Mon Sep 17 00:00:00 2001 From: Prashant Varanasi Date: Tue, 5 Jan 2021 15:27:59 -0800 Subject: [PATCH 50/54] Remove Lock/Unlock from tests --- zapcore/write_syncer_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 0f8fd9129..5fa1fe687 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -131,17 +131,12 @@ func TestBufferWriter(t *testing.T) { defer close() requireWriteWorks(t, ws) ztest.Sleep(10 * time.Millisecond) - bws := ws.(*bufferWriterSyncer) - bws.Lock() assert.Equal(t, "foo", buf.String(), "Unexpected log string") - bws.Unlock() // flush twice to validate loop logic requireWriteWorks(t, ws) ztest.Sleep(10 * time.Millisecond) - bws.Lock() assert.Equal(t, "foofoo", buf.String(), "Unexpected log string") - bws.Unlock() }) } From 0cd98f3bb5f38677eb6889ed9c3ae693e1170acf Mon Sep 17 00:00:00 2001 From: liqi Date: Wed, 6 Jan 2021 15:46:05 +0800 Subject: [PATCH 51/54] remove double buffer check --- zapcore/write_syncer.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index dcb02472e..8b56c70ea 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -111,11 +111,6 @@ const ( // The buffer size defaults to 256 kB if set to zero. // The flush interval defaults to 30 seconds if set to zero. func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ WriteSyncer, close func() error) { - if _, ok := ws.(*bufferWriterSyncer); ok { - // no need to layer on another buffer - return ws, func() error { return nil } - } - if bufferSize == 0 { bufferSize = _defaultBufferSize } From fe04d616cddb79d09b8045682951b6faac7333ad Mon Sep 17 00:00:00 2001 From: liqi Date: Wed, 6 Jan 2021 15:47:13 +0800 Subject: [PATCH 52/54] set timer to zero --- zapcore/write_syncer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 5fa1fe687..61c7a6790 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -117,7 +117,7 @@ func TestBufferWriter(t *testing.T) { }) t.Run("flush error", func(t *testing.T) { - ws, close := Buffer(AddSync(&errorWriter{}), 4, time.Nanosecond) + ws, close := Buffer(AddSync(&errorWriter{}), 4, 0) n, err := ws.Write([]byte("foo")) require.NoError(t, err, "Unexpected error writing to WriteSyncer.") require.Equal(t, 3, n, "Wrote an unexpected number of bytes.") From 23ca47dafc18f53c5ae0bd4605da4fb8ad56ce18 Mon Sep 17 00:00:00 2001 From: liqi Date: Wed, 6 Jan 2021 15:52:19 +0800 Subject: [PATCH 53/54] remove errorWriter --- zapcore/write_syncer_test.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 61c7a6790..98d62ff75 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -39,10 +39,6 @@ type writeSyncSpy struct { ztest.Syncer } -type errorWriter struct{} - -func (*errorWriter) Write([]byte) (int, error) { return 0, errors.New("unimplemented") } - func requireWriteWorks(t testing.TB, ws WriteSyncer) { n, err := ws.Write([]byte("foo")) require.NoError(t, err, "Unexpected error writing to WriteSyncer.") @@ -117,7 +113,7 @@ func TestBufferWriter(t *testing.T) { }) t.Run("flush error", func(t *testing.T) { - ws, close := Buffer(AddSync(&errorWriter{}), 4, 0) + ws, close := Buffer(AddSync(&ztest.FailWriter{}), 4, 0) n, err := ws.Write([]byte("foo")) require.NoError(t, err, "Unexpected error writing to WriteSyncer.") require.Equal(t, 3, n, "Wrote an unexpected number of bytes.") From 2b0cf17e6e9ea78e3446d5839968460a8b02ef13 Mon Sep 17 00:00:00 2001 From: liqi Date: Wed, 6 Jan 2021 16:59:24 +0800 Subject: [PATCH 54/54] add SyncBuffer --- internal/ztest/writer.go | 26 ++++++++++++++++++++++++++ zapcore/write_syncer_test.go | 2 +- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/internal/ztest/writer.go b/internal/ztest/writer.go index 9fdd5805e..b9a1928dc 100644 --- a/internal/ztest/writer.go +++ b/internal/ztest/writer.go @@ -25,6 +25,7 @@ import ( "errors" "io/ioutil" "strings" + "sync" ) // A Syncer is a spy for the Sync portion of zapcore.WriteSyncer. @@ -94,3 +95,28 @@ func (b *Buffer) Lines() []string { func (b *Buffer) Stripped() string { return strings.TrimRight(b.String(), "\n") } + +// SyncBuffer is an implementation of bytes.Buffer which is goroutine safe. +type SyncBuffer struct { + sync.RWMutex + + buf bytes.Buffer +} + +// Write appends the contents of p to the buffer, growing the buffer as +// needed. +func (b *SyncBuffer) Write(p []byte) (n int, err error) { + b.Lock() + defer b.Unlock() + + return b.buf.Write(p) +} + +// String returns the contents of the unread portion of the buffer +// as a string. +func (b *SyncBuffer) String() string { + b.RLock() + defer b.RUnlock() + + return b.buf.String() +} diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 98d62ff75..e41ae2369 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -122,7 +122,7 @@ func TestBufferWriter(t *testing.T) { }) t.Run("flush timer", func(t *testing.T) { - buf := &bytes.Buffer{} + buf := &ztest.SyncBuffer{} ws, close := Buffer(AddSync(buf), 6, time.Microsecond) defer close() requireWriteWorks(t, ws)