From fc8195ff551a811950afda84bda5e4bf5c00cb73 Mon Sep 17 00:00:00 2001 From: Moises Vega Date: Tue, 25 May 2021 13:01:39 -0700 Subject: [PATCH] Add buffered write syncer --- go.mod | 10 -- go.sum | 19 --- internal/ztest/writer.go | 26 ---- zapcore/buffered_write_syncer.go | 131 ++++++++++++++++++++ zapcore/buffered_write_syncer_bench_test.go | 49 ++++++++ zapcore/buffered_write_syncer_test.go | 106 ++++++++++++++++ zapcore/write_syncer.go | 105 ---------------- zapcore/write_syncer_bench_test.go | 31 ++--- zapcore/write_syncer_test.go | 74 +---------- 9 files changed, 296 insertions(+), 255 deletions(-) create mode 100644 zapcore/buffered_write_syncer.go create mode 100644 zapcore/buffered_write_syncer_bench_test.go create mode 100644 zapcore/buffered_write_syncer_test.go diff --git a/go.mod b/go.mod index 2d927129e..9455c99cc 100644 --- a/go.mod +++ b/go.mod @@ -5,20 +5,10 @@ go 1.13 require ( github.com/benbjohnson/clock v1.1.0 github.com/pkg/errors v0.8.1 -<<<<<<< HEAD github.com/stretchr/testify v1.7.0 go.uber.org/atomic v1.7.0 go.uber.org/goleak v1.1.10 go.uber.org/multierr v1.6.0 gopkg.in/yaml.v2 v2.2.8 gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect -======= - github.com/stretchr/testify v1.4.0 - go.uber.org/atomic v1.6.0 - go.uber.org/goleak v1.1.10 - go.uber.org/multierr v1.5.0 - golang.org/x/lint v0.0.0-20190930215403-16217165b5de - gopkg.in/yaml.v2 v2.2.2 - honnef.co/go/tools v0.0.1-2019.2.3 ->>>>>>> 761c8e8 (improve close logic and use goleak) ) diff --git a/go.sum b/go.sum index f95667197..9031a6131 100644 --- a/go.sum +++ b/go.sum @@ -15,7 +15,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -<<<<<<< HEAD github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= @@ -24,16 +23,6 @@ go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -======= -go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= -go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= -go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= -go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= -go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4= -go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= ->>>>>>> 761c8e8 (improve close logic and use goleak) golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -43,14 +32,6 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -<<<<<<< HEAD -======= -golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= -golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U= -golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs= -golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= ->>>>>>> 761c8e8 (improve close logic and use goleak) golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E= golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/ztest/writer.go b/internal/ztest/writer.go index b9a1928dc..9fdd5805e 100644 --- a/internal/ztest/writer.go +++ b/internal/ztest/writer.go @@ -25,7 +25,6 @@ import ( "errors" "io/ioutil" "strings" - "sync" ) // A Syncer is a spy for the Sync portion of zapcore.WriteSyncer. @@ -95,28 +94,3 @@ func (b *Buffer) Lines() []string { func (b *Buffer) Stripped() string { return strings.TrimRight(b.String(), "\n") } - -// SyncBuffer is an implementation of bytes.Buffer which is goroutine safe. -type SyncBuffer struct { - sync.RWMutex - - buf bytes.Buffer -} - -// Write appends the contents of p to the buffer, growing the buffer as -// needed. -func (b *SyncBuffer) Write(p []byte) (n int, err error) { - b.Lock() - defer b.Unlock() - - return b.buf.Write(p) -} - -// String returns the contents of the unread portion of the buffer -// as a string. -func (b *SyncBuffer) String() string { - b.RLock() - defer b.RUnlock() - - return b.buf.String() -} diff --git a/zapcore/buffered_write_syncer.go b/zapcore/buffered_write_syncer.go new file mode 100644 index 000000000..e336866c9 --- /dev/null +++ b/zapcore/buffered_write_syncer.go @@ -0,0 +1,131 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package zapcore + +import ( + "bufio" + "sync" + "time" +) + +// A BufferedWriteSyncer is a WriteSyncer that can also flush any buffered data +// with the ability to change the buffer size, flush interval and Clock. +// The default values are; Size 256kb, FlushInterval 30s. +type BufferedWriteSyncer struct { + WriteSyncer + + Size int + FlushInterval time.Duration + Clock Clock + + // unexported fields for state + mu sync.Mutex + writer *bufio.Writer + ticker *time.Ticker + stop chan struct{} + initialized bool +} + +const ( + // _defaultBufferSize specifies the default size used by Buffer. + _defaultBufferSize = 256 * 1024 // 256 kB + + // _defaultFlushInterval specifies the default flush interval for + // Buffer. + _defaultFlushInterval = 30 * time.Second +) + +func (s *BufferedWriteSyncer) loadConfig() { + size := s.Size + if size == 0 { + size = _defaultBufferSize + } + + flushInterval := s.FlushInterval + if flushInterval == 0 { + flushInterval = _defaultFlushInterval + } + + if s.Clock != nil { + s.ticker = s.Clock.NewTicker(flushInterval) + } else { + s.ticker = DefaultClock.NewTicker(flushInterval) + } + + s.writer = bufio.NewWriterSize(s.WriteSyncer, size) + s.stop = make(chan struct{}) + s.initialized = true + go s.flushLoop() +} + +// Write writes log data into buffer syncer directly, multiple Write calls will be batched, +// and log data will be flushed to disk when the buffer is full or periodically. +func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if !s.initialized { + s.loadConfig() + } + + // To avoid partial writes from being flushed, we manually flush the existing buffer if: + // * The current write doesn't fit into the buffer fully, and + // * The buffer is not empty (since bufio will not split large writes when the buffer is empty) + if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 { + if err := s.writer.Flush(); err != nil { + return 0, err + } + } + + return s.writer.Write(bs) +} + +// Sync flushes buffered log data into disk directly. +func (s *BufferedWriteSyncer) Sync() error { + s.mu.Lock() + defer s.mu.Unlock() + + return s.writer.Flush() +} + +// flushLoop flushes the buffer at the configured interval until Close is +// called. +func (s *BufferedWriteSyncer) flushLoop() { + for { + select { + case <-s.ticker.C: + // we just simply ignore error here + // because the underlying bufio writer stores any errors + // and we return any error from Sync() as part of the close + _ = s.Sync() + case <-s.stop: + return + } + } +} + +// Close closes the buffer, cleans up background goroutines, and flushes +// remaining, unwritten data. +func (s *BufferedWriteSyncer) Close() error { + s.ticker.Stop() + close(s.stop) + return s.Sync() +} diff --git a/zapcore/buffered_write_syncer_bench_test.go b/zapcore/buffered_write_syncer_bench_test.go new file mode 100644 index 000000000..df3177e4c --- /dev/null +++ b/zapcore/buffered_write_syncer_bench_test.go @@ -0,0 +1,49 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package zapcore + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func BenchmarkBufferedWriteSyncer(b *testing.B) { + b.Run("write file with buffer", func(b *testing.B) { + file, err := ioutil.TempFile("", "log") + assert.NoError(b, err) + defer file.Close() + defer os.Remove(file.Name()) + + w := &BufferedWriteSyncer{ + WriteSyncer: AddSync(file), + } + defer w.Close() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + w.Write([]byte("foobarbazbabble")) + } + }) + }) +} diff --git a/zapcore/buffered_write_syncer_test.go b/zapcore/buffered_write_syncer_test.go new file mode 100644 index 000000000..55cfac2f0 --- /dev/null +++ b/zapcore/buffered_write_syncer_test.go @@ -0,0 +1,106 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package zapcore + +import ( + "bytes" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/internal/ztest" +) + +func TestBufferWriter(t *testing.T) { + // If we pass a plain io.Writer, make sure that we still get a WriteSyncer + // with a no-op Sync. + t.Run("sync", func(t *testing.T) { + buf := &bytes.Buffer{} + ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)} + + requireWriteWorks(t, ws) + assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.") + assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + assert.NoError(t, ws.Close()) + }) + + t.Run("close", func(t *testing.T) { + buf := &bytes.Buffer{} + ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)} + requireWriteWorks(t, ws) + assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.") + assert.NoError(t, ws.Close()) + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) + + t.Run("wrap twice", func(t *testing.T) { + buf := &bytes.Buffer{} + bufsync := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)} + ws := &BufferedWriteSyncer{WriteSyncer: bufsync} + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + assert.NoError(t, ws.Close()) + assert.NoError(t, bufsync.Close()) + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + }) + + t.Run("small buffer", func(t *testing.T) { + buf := &bytes.Buffer{} + ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf), Size: 5} + + requireWriteWorks(t, ws) + assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") + requireWriteWorks(t, ws) + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + assert.NoError(t, ws.Close()) + }) + + t.Run("flush error", func(t *testing.T) { + ws := &BufferedWriteSyncer{WriteSyncer: &ztest.FailWriter{}, Size: 4} + n, err := ws.Write([]byte("foo")) + require.NoError(t, err, "Unexpected error writing to WriteSyncer.") + require.Equal(t, 3, n, "Wrote an unexpected number of bytes.") + ws.Write([]byte("foo")) + assert.Error(t, ws.Close(), "Expected close to fail.") + }) + + t.Run("flush timer", func(t *testing.T) { + buf := &bytes.Buffer{} + clock := newControlledClock() + ws := &BufferedWriteSyncer{ + WriteSyncer: AddSync(buf), + Size: 6, + FlushInterval: time.Microsecond, + Clock: clock, + } + requireWriteWorks(t, ws) + clock.Add(10 * time.Millisecond) + assert.Equal(t, "foo", buf.String(), "Unexpected log string") + + // flush twice to validate loop logic + requireWriteWorks(t, ws) + clock.Add(10 * time.Millisecond) + assert.Equal(t, "foofoo", buf.String(), "Unexpected log string") + assert.NoError(t, ws.Close()) + }) +} diff --git a/zapcore/write_syncer.go b/zapcore/write_syncer.go index 2c99d3d37..d4a1af3d0 100644 --- a/zapcore/write_syncer.go +++ b/zapcore/write_syncer.go @@ -21,10 +21,8 @@ package zapcore import ( - "bufio" "io" "sync" - "time" "go.uber.org/multierr" ) @@ -77,109 +75,6 @@ func (s *lockedWriteSyncer) Sync() error { return err } -type bufferWriterSyncer struct { - sync.Mutex - - stop chan struct{} - bufferWriter *bufio.Writer - ticker *time.Ticker -} - -const ( - // _defaultBufferSize specifies the default size used by Buffer. - _defaultBufferSize = 256 * 1024 // 256 kB - - // _defaultFlushInterval specifies the default flush interval for - // Buffer. - _defaultFlushInterval = 30 * time.Second -) - -// Buffer wraps a WriteSyncer to buffer its output. The returned WriteSyncer -// flushes its output as the buffer fills up, or at the provided interval, -// whichever comes first. -// -// Call the returned function to finish using the WriteSyncer and flush -// remaining bytes. -// -// func main() { -// // ... -// ws, closeWS := zapcore.Buffer(ws, 0, 0) -// defer closeWS() -// // ... -// } -// -// The buffer size defaults to 256 kB if set to zero. -// The flush interval defaults to 30 seconds if set to zero. -func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ WriteSyncer, close func() error) { - if bufferSize == 0 { - bufferSize = _defaultBufferSize - } - - if flushInterval == 0 { - flushInterval = _defaultFlushInterval - } - - bws := &bufferWriterSyncer{ - stop: make(chan struct{}), - bufferWriter: bufio.NewWriterSize(ws, bufferSize), - ticker: time.NewTicker(flushInterval), - } - - go bws.flushLoop() - - return bws, bws.close -} - -// Write writes log data into buffer syncer directly, multiple Write calls will be batched, -// and log data will be flushed to disk when the buffer is full or periodically. -func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { - s.Lock() - defer s.Unlock() - - // To avoid partial writes from being flushed, we manually flush the existing buffer if: - // * The current write doesn't fit into the buffer fully, and - // * The buffer is not empty (since bufio will not split large writes when the buffer is empty) - if len(bs) > s.bufferWriter.Available() && s.bufferWriter.Buffered() > 0 { - if err := s.bufferWriter.Flush(); err != nil { - return 0, err - } - } - - return s.bufferWriter.Write(bs) -} - -// Sync flushes buffered log data into disk directly. -func (s *bufferWriterSyncer) Sync() error { - s.Lock() - defer s.Unlock() - - return s.bufferWriter.Flush() -} - -// flushLoop flushes the buffer at the configured interval until Close is -// called. -func (s *bufferWriterSyncer) flushLoop() { - for { - select { - case <-s.ticker.C: - // we just simply ignore error here - // because the underlying bufio writer stores any errors - // and we return any error from Sync() as part of the close - _ = s.Sync() - case <-s.stop: - return - } - } -} - -// close closes the buffer, cleans up background goroutines, and flushes -// remaining, unwritten data. -func (s *bufferWriterSyncer) close() error { - s.ticker.Stop() - close(s.stop) - return s.Sync() -} - type writerWrapper struct { io.Writer } diff --git a/zapcore/write_syncer_bench_test.go b/zapcore/write_syncer_bench_test.go index f16fc3530..c3bcafe54 100644 --- a/zapcore/write_syncer_bench_test.go +++ b/zapcore/write_syncer_bench_test.go @@ -57,13 +57,15 @@ func BenchmarkMultiWriteSyncer(b *testing.B) { }) }) b.Run("4 discarder with buffer", func(b *testing.B) { - w, close := Buffer(NewMultiWriteSyncer( - &ztest.Discarder{}, - &ztest.Discarder{}, - &ztest.Discarder{}, - &ztest.Discarder{}, - ), 0, 0) - defer close() + w := &BufferedWriteSyncer{ + WriteSyncer: NewMultiWriteSyncer( + &ztest.Discarder{}, + &ztest.Discarder{}, + &ztest.Discarder{}, + &ztest.Discarder{}, + ), + } + defer w.Close() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { @@ -88,19 +90,4 @@ func BenchmarkWriteSyncer(b *testing.B) { } }) }) - b.Run("write file with buffer", func(b *testing.B) { - file, err := ioutil.TempFile("", "log") - assert.NoError(b, err) - defer file.Close() - defer os.Remove(file.Name()) - - w, close := Buffer(AddSync(file), 0, 0) - defer close() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - w.Write([]byte("foobarbazbabble")) - } - }) - }) } diff --git a/zapcore/write_syncer_test.go b/zapcore/write_syncer_test.go index e41ae2369..4748be7f5 100644 --- a/zapcore/write_syncer_test.go +++ b/zapcore/write_syncer_test.go @@ -23,14 +23,11 @@ package zapcore import ( "bytes" "errors" - "testing" - "time" - "io" + "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.uber.org/goleak" "go.uber.org/zap/internal/ztest" ) @@ -67,75 +64,6 @@ func TestAddSyncWriter(t *testing.T) { assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") } -func TestBufferWriter(t *testing.T) { - defer goleak.VerifyNone(t) - - // If we pass a plain io.Writer, make sure that we still get a WriteSyncer - // with a no-op Sync. - t.Run("sync", func(t *testing.T) { - buf := &bytes.Buffer{} - ws, close := Buffer(AddSync(buf), 0, 0) - defer close() - requireWriteWorks(t, ws) - assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.") - assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") - assert.Equal(t, "foo", buf.String(), "Unexpected log string") - }) - - t.Run("close", func(t *testing.T) { - buf := &bytes.Buffer{} - ws, close := Buffer(AddSync(buf), 0, 0) - requireWriteWorks(t, ws) - assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.") - close() - assert.Equal(t, "foo", buf.String(), "Unexpected log string") - }) - - t.Run("wrap twice", func(t *testing.T) { - buf := &bytes.Buffer{} - bufsync, close1 := Buffer(AddSync(buf), 0, 0) - ws, close2 := Buffer(bufsync, 0, 0) - requireWriteWorks(t, ws) - assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - close2() - close1() - assert.Equal(t, "foo", buf.String(), "Unexpected log string") - }) - - t.Run("small buffer", func(t *testing.T) { - buf := &bytes.Buffer{} - ws, close := Buffer(AddSync(buf), 5, 0) - defer close() - requireWriteWorks(t, ws) - assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") - requireWriteWorks(t, ws) - assert.Equal(t, "foo", buf.String(), "Unexpected log string") - }) - - t.Run("flush error", func(t *testing.T) { - ws, close := Buffer(AddSync(&ztest.FailWriter{}), 4, 0) - n, err := ws.Write([]byte("foo")) - require.NoError(t, err, "Unexpected error writing to WriteSyncer.") - require.Equal(t, 3, n, "Wrote an unexpected number of bytes.") - ws.Write([]byte("foo")) - assert.Error(t, close(), "Expected close to fail.") - }) - - t.Run("flush timer", func(t *testing.T) { - buf := &ztest.SyncBuffer{} - ws, close := Buffer(AddSync(buf), 6, time.Microsecond) - defer close() - requireWriteWorks(t, ws) - ztest.Sleep(10 * time.Millisecond) - assert.Equal(t, "foo", buf.String(), "Unexpected log string") - - // flush twice to validate loop logic - requireWriteWorks(t, ws) - ztest.Sleep(10 * time.Millisecond) - assert.Equal(t, "foofoo", buf.String(), "Unexpected log string") - }) -} - func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) { w := &ztest.Buffer{}