Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add buffering wrapper around WriteSyncer #782

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
5f22c26
add buffer sync
sysulq Feb 8, 2020
36d1a4f
support config bufferSize and flushInterval, improve logic
sysulq Feb 11, 2020
9f7be81
improve
sysulq Feb 13, 2020
c1f54ca
update comment
sysulq Feb 13, 2020
93832bc
WriterSyncer support Close method
sysulq Feb 25, 2020
ba42206
improve
sysulq Feb 25, 2020
069d7b9
fix spell
sysulq Feb 25, 2020
60350a2
improve cancel logic
sysulq Mar 3, 2020
1c24017
improve
sysulq Mar 3, 2020
6ab72db
remove close
sysulq Mar 3, 2020
bfd7a94
rename cancel to close and keep syncing
sysulq May 29, 2020
66b7878
fix lint
sysulq Jun 1, 2020
5e98dbb
100% test coverage
sysulq Jun 1, 2020
5980ec9
improve comment
sysulq Jun 1, 2020
7f5913c
improve comment
sysulq Jun 1, 2020
07b0e3a
fix test error
sysulq Jun 1, 2020
dfdb0c2
fix race condition in test case
sysulq Jun 1, 2020
c8b9979
100% test coverage
sysulq Jun 1, 2020
6d9018f
add loop and fix typo
sysulq Jun 5, 2020
f61fe13
Update zapcore/write_syncer.go
sysulq Jun 9, 2020
7954bcf
improve test
sysulq Jun 9, 2020
6e6e6f6
validate loop logic in test case
sysulq Jun 9, 2020
1e7da7f
group default config
sysulq Jun 9, 2020
c427af2
improve close logic
sysulq Jun 23, 2020
1d00db7
Update zapcore/write_syncer.go
sysulq Oct 9, 2020
a17596e
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
ccd3608
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
13cdf32
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
a2a235b
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
0477c9d
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
16a2147
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
761c8e8
improve close logic and use goleak
sysulq Oct 13, 2020
a2123c9
Drop `type CloseFunc`
abhinav Nov 2, 2020
1957050
doc: Rewrite Buffer documentation
abhinav Nov 2, 2020
371117f
Prefix default{BufferSize, FlushInterval} with _
abhinav Nov 2, 2020
22a2427
Buffer/close: return a bound method
abhinav Nov 2, 2020
c0f68a9
Buffer/close: Close the channel instead of posting
abhinav Nov 2, 2020
f57c09b
_default{BufferSize, FlushInterval}: docs
abhinav Nov 2, 2020
70795cf
buffer: Move loop into a method
abhinav Nov 2, 2020
d61c899
buffer/close: stop the ticker
abhinav Nov 2, 2020
20fc7a8
Update zapcore/write_syncer_test.go
sysulq Nov 6, 2020
319e6fe
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
d293796
improve code style
sysulq Nov 6, 2020
d439793
Merge branch 'master' of https://github.com/hnlq715/zap
sysulq Nov 6, 2020
9613897
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
5160827
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
37a503b
add comment
sysulq Nov 6, 2020
bea2863
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
c971509
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
8c2cdfe
update
sysulq Nov 6, 2020
1fe1117
Remove Lock/Unlock from tests
prashantv Jan 5, 2021
0cd98f3
remove double buffer check
sysulq Jan 6, 2021
fe04d61
set timer to zero
sysulq Jan 6, 2021
23ca47d
remove errorWriter
sysulq Jan 6, 2021
2b0cf17
add SyncBuffer
sysulq Jan 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -6,6 +6,7 @@ require (
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.4.0
go.uber.org/atomic v1.6.0
go.uber.org/goleak v1.1.10
go.uber.org/multierr v1.5.0
golang.org/x/lint v0.0.0-20190930215403-16217165b5de
gopkg.in/yaml.v2 v2.2.2
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Expand Up @@ -22,6 +22,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
Expand All @@ -44,6 +46,8 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/N
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
26 changes: 26 additions & 0 deletions internal/ztest/writer.go
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"io/ioutil"
"strings"
"sync"
)

// A Syncer is a spy for the Sync portion of zapcore.WriteSyncer.
Expand Down Expand Up @@ -94,3 +95,28 @@ func (b *Buffer) Lines() []string {
func (b *Buffer) Stripped() string {
return strings.TrimRight(b.String(), "\n")
}

// SyncBuffer is an implementation of bytes.Buffer which is goroutine safe.
type SyncBuffer struct {
sync.RWMutex

buf bytes.Buffer
}

// Write appends the contents of p to the buffer, growing the buffer as
// needed.
func (b *SyncBuffer) Write(p []byte) (n int, err error) {
b.Lock()
defer b.Unlock()

return b.buf.Write(p)
}

// String returns the contents of the unread portion of the buffer
// as a string.
func (b *SyncBuffer) String() string {
b.RLock()
defer b.RUnlock()

return b.buf.String()
}
105 changes: 105 additions & 0 deletions zapcore/write_syncer.go
Expand Up @@ -21,8 +21,10 @@
package zapcore

import (
"bufio"
"io"
"sync"
"time"

"go.uber.org/multierr"
)
Expand Down Expand Up @@ -75,6 +77,109 @@ func (s *lockedWriteSyncer) Sync() error {
return err
}

type bufferWriterSyncer struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we move this whole type and methods into a separate file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file already contains Lock syncer and MultiWrite syncer, I think it is a proper place for Buffer syncer :-)

sync.Mutex
sysulq marked this conversation as resolved.
Show resolved Hide resolved

stop chan struct{}
bufferWriter *bufio.Writer
ticker *time.Ticker
}

const (
// _defaultBufferSize specifies the default size used by Buffer.
_defaultBufferSize = 256 * 1024 // 256 kB

// _defaultFlushInterval specifies the default flush interval for
// Buffer.
_defaultFlushInterval = 30 * time.Second
)

// Buffer wraps a WriteSyncer to buffer its output. The returned WriteSyncer
// flushes its output as the buffer fills up, or at the provided interval,
// whichever comes first.
//
// Call the returned function to finish using the WriteSyncer and flush
// remaining bytes.
//
// func main() {
// // ...
// ws, closeWS := zapcore.Buffer(ws, 0, 0)
// defer closeWS()
// // ...
// }
//
// The buffer size defaults to 256 kB if set to zero.
// The flush interval defaults to 30 seconds if set to zero.
func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ WriteSyncer, close func() error) {
if bufferSize == 0 {
bufferSize = _defaultBufferSize
}

if flushInterval == 0 {
flushInterval = _defaultFlushInterval
}

bws := &bufferWriterSyncer{
stop: make(chan struct{}),
bufferWriter: bufio.NewWriterSize(ws, bufferSize),
ticker: time.NewTicker(flushInterval),
}

go bws.flushLoop()

return bws, bws.close
}

// Write writes log data into buffer syncer directly, multiple Write calls will be batched,
// and log data will be flushed to disk when the buffer is full or periodically.
func (s *bufferWriterSyncer) Write(bs []byte) (int, error) {
s.Lock()
defer s.Unlock()

sysulq marked this conversation as resolved.
Show resolved Hide resolved
// To avoid partial writes from being flushed, we manually flush the existing buffer if:
// * The current write doesn't fit into the buffer fully, and
// * The buffer is not empty (since bufio will not split large writes when the buffer is empty)
if len(bs) > s.bufferWriter.Available() && s.bufferWriter.Buffered() > 0 {
if err := s.bufferWriter.Flush(); err != nil {
return 0, err
}
}

return s.bufferWriter.Write(bs)
}

// Sync flushes buffered log data into disk directly.
func (s *bufferWriterSyncer) Sync() error {
s.Lock()
defer s.Unlock()

return s.bufferWriter.Flush()
sysulq marked this conversation as resolved.
Show resolved Hide resolved
}

// flushLoop flushes the buffer at the configured interval until Close is
// called.
func (s *bufferWriterSyncer) flushLoop() {
for {
select {
case <-s.ticker.C:
// we just simply ignore error here
// because the underlying bufio writer stores any errors
// and we return any error from Sync() as part of the close
_ = s.Sync()
sysulq marked this conversation as resolved.
Show resolved Hide resolved
case <-s.stop:
return
}
}
}

// close closes the buffer, cleans up background goroutines, and flushes
// remaining, unwritten data.
func (s *bufferWriterSyncer) close() error {
s.ticker.Stop()
close(s.stop)
return s.Sync()
}

type writerWrapper struct {
io.Writer
}
Expand Down
54 changes: 52 additions & 2 deletions zapcore/write_syncer_bench_test.go
Expand Up @@ -21,13 +21,16 @@
package zapcore

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap/internal/ztest"
)

func BenchmarkMultiWriteSyncer(b *testing.B) {
b.Run("2", func(b *testing.B) {
b.Run("2 discarder", func(b *testing.B) {
w := NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
Expand All @@ -39,7 +42,7 @@ func BenchmarkMultiWriteSyncer(b *testing.B) {
}
})
})
b.Run("4", func(b *testing.B) {
b.Run("4 discarder", func(b *testing.B) {
w := NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
Expand All @@ -53,4 +56,51 @@ func BenchmarkMultiWriteSyncer(b *testing.B) {
}
})
})
b.Run("4 discarder with buffer", func(b *testing.B) {
sysulq marked this conversation as resolved.
Show resolved Hide resolved
w, close := Buffer(NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
&ztest.Discarder{},
&ztest.Discarder{},
), 0, 0)
defer close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}

func BenchmarkWriteSyncer(b *testing.B) {
b.Run("write file with no buffer", func(b *testing.B) {
file, err := ioutil.TempFile("", "log")
assert.NoError(b, err)
defer file.Close()
defer os.Remove(file.Name())

w := AddSync(file)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
b.Run("write file with buffer", func(b *testing.B) {
file, err := ioutil.TempFile("", "log")
assert.NoError(b, err)
defer file.Close()
defer os.Remove(file.Name())

w, close := Buffer(AddSync(file), 0, 0)
defer close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}
71 changes: 71 additions & 0 deletions zapcore/write_syncer_test.go
Expand Up @@ -24,11 +24,13 @@ import (
"bytes"
"errors"
"testing"
"time"

"io"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"go.uber.org/zap/internal/ztest"
)

Expand Down Expand Up @@ -65,6 +67,75 @@ func TestAddSyncWriter(t *testing.T) {
assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.")
}

func TestBufferWriter(t *testing.T) {
defer goleak.VerifyNone(t)

// If we pass a plain io.Writer, make sure that we still get a WriteSyncer
// with a no-op Sync.
t.Run("sync", func(t *testing.T) {
buf := &bytes.Buffer{}
ws, close := Buffer(AddSync(buf), 0, 0)
defer close()
requireWriteWorks(t, ws)
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.")
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("close", func(t *testing.T) {
buf := &bytes.Buffer{}
ws, close := Buffer(AddSync(buf), 0, 0)
requireWriteWorks(t, ws)
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.")
close()
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("wrap twice", func(t *testing.T) {
buf := &bytes.Buffer{}
bufsync, close1 := Buffer(AddSync(buf), 0, 0)
ws, close2 := Buffer(bufsync, 0, 0)
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
close2()
close1()
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("small buffer", func(t *testing.T) {
buf := &bytes.Buffer{}
ws, close := Buffer(AddSync(buf), 5, 0)
defer close()
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
requireWriteWorks(t, ws)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("flush error", func(t *testing.T) {
ws, close := Buffer(AddSync(&ztest.FailWriter{}), 4, 0)
n, err := ws.Write([]byte("foo"))
require.NoError(t, err, "Unexpected error writing to WriteSyncer.")
require.Equal(t, 3, n, "Wrote an unexpected number of bytes.")
ws.Write([]byte("foo"))
assert.Error(t, close(), "Expected close to fail.")
})

t.Run("flush timer", func(t *testing.T) {
buf := &ztest.SyncBuffer{}
ws, close := Buffer(AddSync(buf), 6, time.Microsecond)
defer close()
requireWriteWorks(t, ws)
ztest.Sleep(10 * time.Millisecond)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")

// flush twice to validate loop logic
requireWriteWorks(t, ws)
ztest.Sleep(10 * time.Millisecond)
assert.Equal(t, "foofoo", buf.String(), "Unexpected log string")
})
}

func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) {
w := &ztest.Buffer{}

Expand Down