Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add buffering wrapper around WriteSyncer #782

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
5f22c26
add buffer sync
sysulq Feb 8, 2020
36d1a4f
support config bufferSize and flushInterval, improve logic
sysulq Feb 11, 2020
9f7be81
improve
sysulq Feb 13, 2020
c1f54ca
update comment
sysulq Feb 13, 2020
93832bc
WriterSyncer support Close method
sysulq Feb 25, 2020
ba42206
improve
sysulq Feb 25, 2020
069d7b9
fix spell
sysulq Feb 25, 2020
60350a2
improve cancel logic
sysulq Mar 3, 2020
1c24017
improve
sysulq Mar 3, 2020
6ab72db
remove close
sysulq Mar 3, 2020
bfd7a94
rename cancel to close and keep syncing
sysulq May 29, 2020
66b7878
fix lint
sysulq Jun 1, 2020
5e98dbb
100% test coverage
sysulq Jun 1, 2020
5980ec9
improve comment
sysulq Jun 1, 2020
7f5913c
improve comment
sysulq Jun 1, 2020
07b0e3a
fix test error
sysulq Jun 1, 2020
dfdb0c2
fix race condition in test case
sysulq Jun 1, 2020
c8b9979
100% test coverage
sysulq Jun 1, 2020
6d9018f
add loop and fix typo
sysulq Jun 5, 2020
f61fe13
Update zapcore/write_syncer.go
sysulq Jun 9, 2020
7954bcf
improve test
sysulq Jun 9, 2020
6e6e6f6
validate loop logic in test case
sysulq Jun 9, 2020
1e7da7f
group default config
sysulq Jun 9, 2020
c427af2
improve close logic
sysulq Jun 23, 2020
1d00db7
Update zapcore/write_syncer.go
sysulq Oct 9, 2020
a17596e
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
ccd3608
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
13cdf32
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
a2a235b
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
0477c9d
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
16a2147
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
761c8e8
improve close logic and use goleak
sysulq Oct 13, 2020
a2123c9
Drop `type CloseFunc`
abhinav Nov 2, 2020
1957050
doc: Rewrite Buffer documentation
abhinav Nov 2, 2020
371117f
Prefix default{BufferSize, FlushInterval} with _
abhinav Nov 2, 2020
22a2427
Buffer/close: return a bound method
abhinav Nov 2, 2020
c0f68a9
Buffer/close: Close the channel instead of posting
abhinav Nov 2, 2020
f57c09b
_default{BufferSize, FlushInterval}: docs
abhinav Nov 2, 2020
70795cf
buffer: Move loop into a method
abhinav Nov 2, 2020
d61c899
buffer/close: stop the ticker
abhinav Nov 2, 2020
20fc7a8
Update zapcore/write_syncer_test.go
sysulq Nov 6, 2020
319e6fe
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
d293796
improve code style
sysulq Nov 6, 2020
d439793
Merge branch 'master' of https://github.com/hnlq715/zap
sysulq Nov 6, 2020
9613897
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
5160827
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
37a503b
add comment
sysulq Nov 6, 2020
bea2863
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
c971509
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
8c2cdfe
update
sysulq Nov 6, 2020
1fe1117
Remove Lock/Unlock from tests
prashantv Jan 5, 2021
0cd98f3
remove double buffer check
sysulq Jan 6, 2021
fe04d61
set timer to zero
sysulq Jan 6, 2021
23ca47d
remove errorWriter
sysulq Jan 6, 2021
2b0cf17
add SyncBuffer
sysulq Jan 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -6,6 +6,7 @@ require (
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.4.0
go.uber.org/atomic v1.6.0
go.uber.org/goleak v1.1.10
go.uber.org/multierr v1.5.0
golang.org/x/lint v0.0.0-20190930215403-16217165b5de
gopkg.in/yaml.v2 v2.2.2
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Expand Up @@ -22,6 +22,8 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
Expand All @@ -44,6 +46,8 @@ golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/N
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
110 changes: 110 additions & 0 deletions zapcore/write_syncer.go
Expand Up @@ -21,8 +21,10 @@
package zapcore

import (
"bufio"
"io"
"sync"
"time"

"go.uber.org/multierr"
)
Expand Down Expand Up @@ -75,6 +77,114 @@ func (s *lockedWriteSyncer) Sync() error {
return err
}

type bufferWriterSyncer struct {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we move this whole type and methods into a separate file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file already contains Lock syncer and MultiWrite syncer, I think it is a proper place for Buffer syncer :-)

sync.Mutex
sysulq marked this conversation as resolved.
Show resolved Hide resolved

stop chan struct{}
bufferWriter *bufio.Writer
ticker *time.Ticker
}

const (
// _defaultBufferSize specifies the default size used by Buffer.
_defaultBufferSize = 256 * 1024 // 256 kB

// _defaultFlushInterval specifies the default flush interval for
// Buffer.
_defaultFlushInterval = 30 * time.Second
)

// Buffer wraps a WriteSyncer to buffer its output. The returned WriteSyncer
// flushes its output as the buffer fills up, or at the provided interval,
// whichever comes first.
//
// Call the returned function to finish using the WriteSyncer and flush
// remaining bytes.
//
// func main() {
// // ...
// ws, closeWS := zapcore.Buffer(ws, 0, 0)
// defer closeWS()
// // ...
// }
//
// The buffer size defaults to 256 kB if set to zero.
// The flush interval defaults to 30 seconds if set to zero.
func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ WriteSyncer, close func() error) {
if _, ok := ws.(*bufferWriterSyncer); ok {
// no need to layer on another buffer
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the different bufferWriteSyncer may have different buffer sizes / flush intervals. we may end up skipping the layering, but could cause unexpected buffering/flushing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should not support mulitple buffer layer to keep it simple.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I missed this before. I think silently ignoring the call here would be
surprising. We could extract and wrap the underlying buffer, but that might be
unnecessarily messy.

We should just double wrap here. WriteSyncers should usually not be
constructed in super complicated logic where risk of unintentional double
wrapping is too high.

return ws, func() error { return nil }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if _, ok := ws.(*bufferWriterSyncer); ok {
// no need to layer on another buffer
return ws, func() error { return nil }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, we can just double wrap this buffer syncer directly.

if bufferSize == 0 {
bufferSize = _defaultBufferSize
}

if flushInterval == 0 {
flushInterval = _defaultFlushInterval
}

bws := &bufferWriterSyncer{
stop: make(chan struct{}),
bufferWriter: bufio.NewWriterSize(ws, bufferSize),
ticker: time.NewTicker(flushInterval),
}

go bws.flushLoop()

return bws, bws.close
}

// Write writes log data into buffer syncer directly, multiple Write calls will be batched,
// and log data will be flushed to disk when the buffer is full or periodically.
func (s *bufferWriterSyncer) Write(bs []byte) (int, error) {
s.Lock()
defer s.Unlock()

sysulq marked this conversation as resolved.
Show resolved Hide resolved
// To avoid partial writes from being flushed, we manually flush the existing buffer if:
// * The current write doesn't fit into the buffer fully, and
// * The buffer is not empty (since bufio will not split large writes when the buffer is empty)
if len(bs) > s.bufferWriter.Available() && s.bufferWriter.Buffered() > 0 {
if err := s.bufferWriter.Flush(); err != nil {
return 0, err
}
}

return s.bufferWriter.Write(bs)
}

// Sync flushes buffered log data into disk directly.
func (s *bufferWriterSyncer) Sync() error {
s.Lock()
defer s.Unlock()

return s.bufferWriter.Flush()
sysulq marked this conversation as resolved.
Show resolved Hide resolved
}

// flushLoop flushes the buffer at the configured interval until Close is
// called.
func (s *bufferWriterSyncer) flushLoop() {
for {
select {
case <-s.ticker.C:
// we just simply ignore error here
// because the underlying bufio writer stores any errors
// and we return any error from Sync() as part of the close
_ = s.Sync()
sysulq marked this conversation as resolved.
Show resolved Hide resolved
case <-s.stop:
return
}
}
}

// close closes the buffer, cleans up background goroutines, and flushes
// remaining, unwritten data.
func (s *bufferWriterSyncer) close() error {
s.ticker.Stop()
close(s.stop)
return s.Sync()
}

type writerWrapper struct {
io.Writer
}
Expand Down
54 changes: 52 additions & 2 deletions zapcore/write_syncer_bench_test.go
Expand Up @@ -21,13 +21,16 @@
package zapcore

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap/internal/ztest"
)

func BenchmarkMultiWriteSyncer(b *testing.B) {
b.Run("2", func(b *testing.B) {
b.Run("2 discarder", func(b *testing.B) {
w := NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
Expand All @@ -39,7 +42,7 @@ func BenchmarkMultiWriteSyncer(b *testing.B) {
}
})
})
b.Run("4", func(b *testing.B) {
b.Run("4 discarder", func(b *testing.B) {
w := NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
Expand All @@ -53,4 +56,51 @@ func BenchmarkMultiWriteSyncer(b *testing.B) {
}
})
})
b.Run("4 discarder with buffer", func(b *testing.B) {
sysulq marked this conversation as resolved.
Show resolved Hide resolved
w, close := Buffer(NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
&ztest.Discarder{},
&ztest.Discarder{},
), 0, 0)
defer close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}

func BenchmarkWriteSyncer(b *testing.B) {
b.Run("write file with no buffer", func(b *testing.B) {
file, err := ioutil.TempFile("", "log")
assert.NoError(b, err)
defer file.Close()
defer os.Remove(file.Name())

w := AddSync(file)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
b.Run("write file with buffer", func(b *testing.B) {
file, err := ioutil.TempFile("", "log")
assert.NoError(b, err)
defer file.Close()
defer os.Remove(file.Name())

w, close := Buffer(AddSync(file), 0, 0)
defer close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}
80 changes: 80 additions & 0 deletions zapcore/write_syncer_test.go
Expand Up @@ -24,11 +24,13 @@ import (
"bytes"
"errors"
"testing"
"time"

"io"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"go.uber.org/zap/internal/ztest"
)

Expand All @@ -37,6 +39,10 @@ type writeSyncSpy struct {
ztest.Syncer
}

type errorWriter struct{}

func (*errorWriter) Write([]byte) (int, error) { return 0, errors.New("unimplemented") }

func requireWriteWorks(t testing.TB, ws WriteSyncer) {
n, err := ws.Write([]byte("foo"))
require.NoError(t, err, "Unexpected error writing to WriteSyncer.")
Expand Down Expand Up @@ -65,6 +71,80 @@ func TestAddSyncWriter(t *testing.T) {
assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.")
}

func TestBufferWriter(t *testing.T) {
defer goleak.VerifyNone(t)

// If we pass a plain io.Writer, make sure that we still get a WriteSyncer
// with a no-op Sync.
t.Run("sync", func(t *testing.T) {
buf := &bytes.Buffer{}
ws, close := Buffer(AddSync(buf), 0, 0)
defer close()
requireWriteWorks(t, ws)
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.")
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("close", func(t *testing.T) {
buf := &bytes.Buffer{}
ws, close := Buffer(AddSync(buf), 0, 0)
requireWriteWorks(t, ws)
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.")
close()
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("wrap twice", func(t *testing.T) {
buf := &bytes.Buffer{}
bufsync, close1 := Buffer(AddSync(buf), 0, 0)
ws, close2 := Buffer(bufsync, 0, 0)
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
close2()
close1()
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("small buffer", func(t *testing.T) {
buf := &bytes.Buffer{}
ws, close := Buffer(AddSync(buf), 5, 0)
defer close()
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
requireWriteWorks(t, ws)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("flush error", func(t *testing.T) {
ws, close := Buffer(AddSync(&errorWriter{}), 4, time.Nanosecond)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why such a short flush timer here? we're relying on the buffer size limit to flush rather than the background timer I think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely, we can definitely set this timer to 0.

n, err := ws.Write([]byte("foo"))
require.NoError(t, err, "Unexpected error writing to WriteSyncer.")
require.Equal(t, 3, n, "Wrote an unexpected number of bytes.")
ws.Write([]byte("foo"))
assert.Error(t, close(), "Expected close to fail.")
})

t.Run("flush timer", func(t *testing.T) {
buf := &bytes.Buffer{}
ws, close := Buffer(AddSync(buf), 6, time.Microsecond)
defer close()
requireWriteWorks(t, ws)
ztest.Sleep(10 * time.Millisecond)
bws := ws.(*bufferWriterSyncer)
bws.Lock()
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
bws.Unlock()

// flush twice to validate loop logic
requireWriteWorks(t, ws)
ztest.Sleep(10 * time.Millisecond)
bws.Lock()
assert.Equal(t, "foofoo", buf.String(), "Unexpected log string")
bws.Unlock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the lock necessary here? there should be nothing to flush, so the buf should not be mutated while we read this (and if it does get mutated, that's likely a real race issue to investigate)?

Copy link
Contributor Author

@sysulq sysulq Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, bytes.Buffer is not goroutine safe.
Because we write the buf in goroutine 20, and read the buf in goroutine 19, maybe this race log says all of it.
And it should only reach race condition in this test case, because we need to check what the buf contains by buf.String().

And we want to avoid test case fail, so add lock in this test case just works fine :-)

Read at 0x00c000117560 by goroutine 19:
  bytes.(*Buffer).String()
      /home/liqi/workspace/go/src/bytes/buffer.go:65 +0x411
  go.uber.org/zap/zapcore.TestBufferWriter.func6()
      /home/liqi/workspace/zap/zapcore/write_syncer_test.go:134 +0x1a4
  testing.tRunner()
      /home/liqi/workspace/go/src/testing/testing.go:1127 +0x202

Previous write at 0x00c000117560 by goroutine 20:
  bytes.(*Buffer).grow()
      /home/liqi/workspace/go/src/bytes/buffer.go:128 +0x484
  bytes.(*Buffer).Write()
      /home/liqi/workspace/go/src/bytes/buffer.go:172 +0x184
  go.uber.org/zap/zapcore.(*writerWrapper).Write()
      <autogenerated>:1 +0x87
  bufio.(*Writer).Flush()
      /home/liqi/workspace/go/src/bufio/bufio.go:607 +0x13c
  go.uber.org/zap/zapcore.(*bufferWriterSyncer).Sync()
      /home/liqi/workspace/zap/zapcore/write_syncer.go:161 +0xb8
  go.uber.org/zap/zapcore.(*bufferWriterSyncer).flushLoop()
      /home/liqi/workspace/zap/zapcore/write_syncer.go:173 +0x87

Goroutine 19 (running) created at:
  testing.(*T).Run()
      /home/liqi/workspace/go/src/testing/testing.go:1178 +0x796
  go.uber.org/zap/zapcore.TestBufferWriter()
      /home/liqi/workspace/zap/zapcore/write_syncer_test.go:128 +0x1cc
  testing.tRunner()
      /home/liqi/workspace/go/src/testing/testing.go:1127 +0x202

Goroutine 20 (running) created at:
  go.uber.org/zap/zapcore.Buffer()
      /home/liqi/workspace/zap/zapcore/write_syncer.go:133 +0x32b
  go.uber.org/zap/zapcore.TestBufferWriter.func6()
      /home/liqi/workspace/zap/zapcore/write_syncer_test.go:130 +0xde
  testing.tRunner()
      /home/liqi/workspace/go/src/testing/testing.go:1127 +0x202

})
}

func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) {
w := &ztest.Buffer{}

Expand Down