diff --git a/go.mod b/go.mod index 6ef4db70e..66bfc742a 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.4.0 go.uber.org/atomic v1.6.0 + go.uber.org/goleak v1.1.10 go.uber.org/multierr v1.5.0 golang.org/x/lint v0.0.0-20190930215403-16217165b5de gopkg.in/yaml.v2 v2.2.2 diff --git a/go.sum b/go.sum index 99cdb93ea..5a3bbea55 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= @@ -44,6 +46,8 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/N golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/ztest/writer.go b/internal/ztest/writer.go index 9fdd5805e..b9a1928dc 100644 --- a/internal/ztest/writer.go +++ b/internal/ztest/writer.go @@ -25,6 +25,7 @@ import ( "errors" "io/ioutil" "strings" + "sync" ) // A Syncer is a spy for the Sync portion of zapcore.WriteSyncer. @@ -94,3 +95,28 @@ func (b *Buffer) Lines() []string { func (b *Buffer) Stripped() string { return strings.TrimRight(b.String(), "\n") } + +// SyncBuffer is an implementation of bytes.Buffer which is goroutine safe. +type SyncBuffer struct { + sync.RWMutex + + buf bytes.Buffer +} + +// Write appends the contents of p to the buffer, growing the buffer as +// needed. +func (b *SyncBuffer) Write(p []byte) (n int, err error) { + b.Lock() + defer b.Unlock() + + return b.buf.Write(p) +} + +// String returns the contents of the unread portion of the buffer +// as a string. +func (b *SyncBuffer) String() string { + b.RLock() + defer b.RUnlock() + + return b.buf.String() +} diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 209e25fe2..8b56c70ea 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -21,8 +21,10 @@ package zapcore import ( + "bufio" "io" "sync" + "time" "go.uber.org/multierr" ) @@ -75,6 +77,109 @@ func (s *lockedWriteSyncer) Sync() error { return err } +type bufferWriterSyncer struct { + sync.Mutex + + stop chan struct{} + bufferWriter *bufio.Writer + ticker *time.Ticker +} + +const ( + // _defaultBufferSize specifies the default size used by Buffer. + _defaultBufferSize = 256 * 1024 // 256 kB + + // _defaultFlushInterval specifies the default flush interval for + // Buffer. + _defaultFlushInterval = 30 * time.Second +) + +// Buffer wraps a WriteSyncer to buffer its output. The returned WriteSyncer +// flushes its output as the buffer fills up, or at the provided interval, +// whichever comes first. +// +// Call the returned function to finish using the WriteSyncer and flush +// remaining bytes. +// +// func main() { +// // ... +// ws, closeWS := zapcore.Buffer(ws, 0, 0) +// defer closeWS() +// // ... +// } +// +// The buffer size defaults to 256 kB if set to zero. +// The flush interval defaults to 30 seconds if set to zero. +func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ WriteSyncer, close func() error) { + if bufferSize == 0 { + bufferSize = _defaultBufferSize + } + + if flushInterval == 0 { + flushInterval = _defaultFlushInterval + } + + bws := &bufferWriterSyncer{ + stop: make(chan struct{}), + bufferWriter: bufio.NewWriterSize(ws, bufferSize), + ticker: time.NewTicker(flushInterval), + } + + go bws.flushLoop() + + return bws, bws.close +} + +// Write writes log data into buffer syncer directly, multiple Write calls will be batched, +// and log data will be flushed to disk when the buffer is full or periodically. +func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { + s.Lock() + defer s.Unlock() + + // To avoid partial writes from being flushed, we manually flush the existing buffer if: + // * The current write doesn't fit into the buffer fully, and + // * The buffer is not empty (since bufio will not split large writes when the buffer is empty) + if len(bs) > s.bufferWriter.Available() && s.bufferWriter.Buffered() > 0 { + if err := s.bufferWriter.Flush(); err != nil { + return 0, err + } + } + + return s.bufferWriter.Write(bs) +} + +// Sync flushes buffered log data into disk directly. +func (s *bufferWriterSyncer) Sync() error { + s.Lock() + defer s.Unlock() + + return s.bufferWriter.Flush() +} + +// flushLoop flushes the buffer at the configured interval until Close is +// called. +func (s *bufferWriterSyncer) flushLoop() { + for { + select { + case <-s.ticker.C: + // we just simply ignore error here + // because the underlying bufio writer stores any errors + // and we return any error from Sync() as part of the close + _ = s.Sync() + case <-s.stop: + return + } + } +} + +// close closes the buffer, cleans up background goroutines, and flushes +// remaining, unwritten data. +func (s *bufferWriterSyncer) close() error { + s.ticker.Stop() + close(s.stop) + return s.Sync() +} + type writerWrapper struct { io.Writer } diff --git a/zapcore/write_syncer_bench_test.go b/zapcore/write_syncer_bench_test.go index 0209d0f61..f16fc3530 100644 --- a/zapcore/write_syncer_bench_test.go +++ b/zapcore/write_syncer_bench_test.go @@ -21,13 +21,16 @@ package zapcore import ( + "io/ioutil" + "os" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap/internal/ztest" ) func BenchmarkMultiWriteSyncer(b *testing.B) { - b.Run("2", func(b *testing.B) { + b.Run("2 discarder", func(b *testing.B) { w := NewMultiWriteSyncer( &ztest.Discarder{}, &ztest.Discarder{}, @@ -39,7 +42,7 @@ func BenchmarkMultiWriteSyncer(b *testing.B) { } }) }) - b.Run("4", func(b *testing.B) { + b.Run("4 discarder", func(b *testing.B) { w := NewMultiWriteSyncer( &ztest.Discarder{}, &ztest.Discarder{}, @@ -53,4 +56,51 @@ func BenchmarkMultiWriteSyncer(b *testing.B) { } }) }) + b.Run("4 discarder with buffer", func(b *testing.B) { + w, close := Buffer(NewMultiWriteSyncer( + &ztest.Discarder{}, + &ztest.Discarder{}, + &ztest.Discarder{}, + &ztest.Discarder{}, + ), 0, 0) + defer close() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + w.Write([]byte("foobarbazbabble")) + } + }) + }) +} + +func BenchmarkWriteSyncer(b *testing.B) { + b.Run("write file with no buffer", func(b *testing.B) { + file, err := ioutil.TempFile("", "log") + assert.NoError(b, err) + defer file.Close() + defer os.Remove(file.Name()) + + w := AddSync(file) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + w.Write([]byte("foobarbazbabble")) + } + }) + }) + b.Run("write file with buffer", func(b *testing.B) { + file, err := ioutil.TempFile("", "log") + assert.NoError(b, err) + defer file.Close() + defer os.Remove(file.Name()) + + w, close := Buffer(AddSync(file), 0, 0) + defer close() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + w.Write([]byte("foobarbazbabble")) + } + }) + }) } diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index 3ccb0af24..e41ae2369 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -24,11 +24,13 @@ import ( "bytes" "errors" "testing" + "time" "io" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/goleak" "go.uber.org/zap/internal/ztest" ) @@ -65,6 +67,75 @@ func TestAddSyncWriter(t *testing.T) { assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") } +func TestBufferWriter(t *testing.T) { + defer goleak.VerifyNone(t) + + // If we pass a plain io.Writer, make sure that we still get a WriteSyncer + // with a no-op Sync. + t.Run("sync", func(t *testing.T) { + buf := &bytes.Buffer{} + ws, close := Buffer(AddSync(buf), 0, 0) + defer close() + requireWriteWorks(t, ws) + assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.") + assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) + + t.Run("close", func(t *testing.T) { + buf := &bytes.Buffer{} + ws, close := Buffer(AddSync(buf), 0, 0) + requireWriteWorks(t, ws) + assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.") + close() + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) + + t.Run("wrap twice", func(t *testing.T) { + buf := &bytes.Buffer{} + bufsync, close1 := Buffer(AddSync(buf), 0, 0) + ws, close2 := Buffer(bufsync, 0, 0) + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + close2() + close1() + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) + + t.Run("small buffer", func(t *testing.T) { + buf := &bytes.Buffer{} + ws, close := Buffer(AddSync(buf), 5, 0) + defer close() + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + requireWriteWorks(t, ws) + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) + + t.Run("flush error", func(t *testing.T) { + ws, close := Buffer(AddSync(&ztest.FailWriter{}), 4, 0) + n, err := ws.Write([]byte("foo")) + require.NoError(t, err, "Unexpected error writing to WriteSyncer.") + require.Equal(t, 3, n, "Wrote an unexpected number of bytes.") + ws.Write([]byte("foo")) + assert.Error(t, close(), "Expected close to fail.") + }) + + t.Run("flush timer", func(t *testing.T) { + buf := &ztest.SyncBuffer{} + ws, close := Buffer(AddSync(buf), 6, time.Microsecond) + defer close() + requireWriteWorks(t, ws) + ztest.Sleep(10 * time.Millisecond) + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + + // flush twice to validate loop logic + requireWriteWorks(t, ws) + ztest.Sleep(10 * time.Millisecond) + assert.Equal(t, "foofoo", buf.String(), "Unexpected log string") + }) +} + func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) { w := &ztest.Buffer{}