New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add buffering wrapper around WriteSyncer #782
Changes from 40 commits
5f22c26
36d1a4f
9f7be81
c1f54ca
93832bc
ba42206
069d7b9
60350a2
1c24017
6ab72db
bfd7a94
66b7878
5e98dbb
5980ec9
7f5913c
07b0e3a
dfdb0c2
c8b9979
6d9018f
f61fe13
7954bcf
6e6e6f6
1e7da7f
c427af2
1d00db7
a17596e
ccd3608
13cdf32
a2a235b
0477c9d
16a2147
761c8e8
a2123c9
1957050
371117f
22a2427
c0f68a9
f57c09b
70795cf
d61c899
20fc7a8
319e6fe
d293796
d439793
9613897
5160827
37a503b
bea2863
c971509
8c2cdfe
1fe1117
0cd98f3
fe04d61
23ca47d
2b0cf17
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -21,8 +21,10 @@ | |||||||||
package zapcore | ||||||||||
|
||||||||||
import ( | ||||||||||
"bufio" | ||||||||||
"io" | ||||||||||
"sync" | ||||||||||
"time" | ||||||||||
|
||||||||||
"go.uber.org/multierr" | ||||||||||
) | ||||||||||
|
@@ -75,6 +77,117 @@ func (s *lockedWriteSyncer) Sync() error { | |||||||||
return err | ||||||||||
} | ||||||||||
|
||||||||||
type bufferWriterSyncer struct { | ||||||||||
sync.Mutex | ||||||||||
sysulq marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
stop chan struct{} | ||||||||||
|
||||||||||
bufferWriter *bufio.Writer | ||||||||||
ticker *time.Ticker | ||||||||||
} | ||||||||||
|
||||||||||
const ( | ||||||||||
// _defaultBufferSize specifies the default size used by Buffer. | ||||||||||
_defaultBufferSize = 256 * 1024 // 256 kB | ||||||||||
|
||||||||||
// _defaultFlushInterval specifies the default flush interval for | ||||||||||
// Buffer. | ||||||||||
_defaultFlushInterval = 30 * time.Second | ||||||||||
) | ||||||||||
|
||||||||||
// Buffer wraps a WriteSyncer to buffer its output. The returned WriteSyncer | ||||||||||
// flushes its output as the buffer fills up, or at the provided interval, | ||||||||||
// whichever comes first. | ||||||||||
// | ||||||||||
// Call the returned function to finish using the WriteSyncer and flush | ||||||||||
// remaining bytes. | ||||||||||
// | ||||||||||
// func main() { | ||||||||||
// // ... | ||||||||||
// ws, closeWS := zapcore.Buffer(ws, 0, 0) | ||||||||||
// defer closeWS() | ||||||||||
// // ... | ||||||||||
// } | ||||||||||
// | ||||||||||
// The buffer size defaults to 256 kB if set to zero. | ||||||||||
// The flush interval defaults to 30 seconds if set to zero. | ||||||||||
func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) (_ WriteSyncer, close func() error) { | ||||||||||
if _, ok := ws.(*bufferWriterSyncer); ok { | ||||||||||
// no need to layer on another buffer | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the different There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should not support mulitple buffer layer to keep it simple. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I missed this before. I think silently ignoring the call here would be We should just double wrap here. WriteSyncers should usually not be |
||||||||||
return ws, func() error { return nil } | ||||||||||
} | ||||||||||
|
||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alright, we can just double wrap this buffer syncer directly. |
||||||||||
if bufferSize == 0 { | ||||||||||
bufferSize = _defaultBufferSize | ||||||||||
} | ||||||||||
|
||||||||||
if flushInterval == 0 { | ||||||||||
flushInterval = _defaultFlushInterval | ||||||||||
} | ||||||||||
|
||||||||||
ticker := time.NewTicker(flushInterval) | ||||||||||
|
||||||||||
bws := &bufferWriterSyncer{ | ||||||||||
stop: make(chan struct{}), | ||||||||||
bufferWriter: bufio.NewWriterSize(ws, bufferSize), | ||||||||||
ticker: ticker, | ||||||||||
} | ||||||||||
sysulq marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
|
||||||||||
go bws.flushLoop() | ||||||||||
|
||||||||||
return bws, bws.close | ||||||||||
} | ||||||||||
|
||||||||||
// Write writes log data into buffer syncer directly, multiple Write calls will be batched, | ||||||||||
// and log data will be flushed to disk when the buffer is full or periodically. | ||||||||||
func (s *bufferWriterSyncer) Write(bs []byte) (int, error) { | ||||||||||
// bufio is not goroutine safe, so add lock writer here | ||||||||||
sysulq marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
s.Lock() | ||||||||||
defer s.Unlock() | ||||||||||
|
||||||||||
sysulq marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
// there are some logic internal for bufio.Writer here: | ||||||||||
// 1. when the buffer is enough, data would not be flushed. | ||||||||||
sysulq marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
// 2. when the buffer is not enough, data would be flushed as soon as the buffer fills up. | ||||||||||
// this would lead to log spliting, which is not acceptable for log collector | ||||||||||
// so we need to flush bufferWriter before writing the data into bufferWriter | ||||||||||
sysulq marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
if len(bs) > s.bufferWriter.Available() && s.bufferWriter.Buffered() > 0 { | ||||||||||
if err := s.bufferWriter.Flush(); err != nil { | ||||||||||
return 0, err | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
return s.bufferWriter.Write(bs) | ||||||||||
} | ||||||||||
|
||||||||||
// Sync flushes buffered log data into disk directly. | ||||||||||
func (s *bufferWriterSyncer) Sync() error { | ||||||||||
// bufio is not goroutine safe, so add lock writer here | ||||||||||
sysulq marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
s.Lock() | ||||||||||
defer s.Unlock() | ||||||||||
|
||||||||||
return s.bufferWriter.Flush() | ||||||||||
sysulq marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
} | ||||||||||
|
||||||||||
// flushLoop flushes the buffer at the configured interval until Close is | ||||||||||
// called. | ||||||||||
func (s *bufferWriterSyncer) flushLoop() { | ||||||||||
for { | ||||||||||
select { | ||||||||||
case <-s.ticker.C: | ||||||||||
_ = s.Sync() | ||||||||||
sysulq marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||
case <-s.stop: | ||||||||||
return | ||||||||||
} | ||||||||||
} | ||||||||||
} | ||||||||||
|
||||||||||
// close closes the buffer, cleans up background goroutines, and flushes | ||||||||||
// remaining, unwritten data. | ||||||||||
func (s *bufferWriterSyncer) close() error { | ||||||||||
s.ticker.Stop() | ||||||||||
close(s.stop) | ||||||||||
return s.Sync() | ||||||||||
} | ||||||||||
|
||||||||||
type writerWrapper struct { | ||||||||||
io.Writer | ||||||||||
} | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,11 +24,13 @@ import ( | |
"bytes" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"io" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"go.uber.org/goleak" | ||
"go.uber.org/zap/internal/ztest" | ||
) | ||
|
||
|
@@ -37,6 +39,10 @@ type writeSyncSpy struct { | |
ztest.Syncer | ||
} | ||
|
||
type errorWriter struct{} | ||
|
||
func (*errorWriter) Write([]byte) (int, error) { return 0, errors.New("unimplemented") } | ||
|
||
func requireWriteWorks(t testing.TB, ws WriteSyncer) { | ||
n, err := ws.Write([]byte("foo")) | ||
require.NoError(t, err, "Unexpected error writing to WriteSyncer.") | ||
|
@@ -65,6 +71,80 @@ func TestAddSyncWriter(t *testing.T) { | |
assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") | ||
} | ||
|
||
func TestBufferWriter(t *testing.T) { | ||
goleak.VerifyNone(t) | ||
sysulq marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// If we pass a plain io.Writer, make sure that we still get a WriteSyncer | ||
// with a no-op Sync. | ||
t.Run("sync", func(t *testing.T) { | ||
buf := &bytes.Buffer{} | ||
ws, close := Buffer(AddSync(buf), 0, 0) | ||
defer close() | ||
requireWriteWorks(t, ws) | ||
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.") | ||
assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.") | ||
assert.Equal(t, "foo", buf.String(), "Unexpected log string") | ||
}) | ||
|
||
t.Run("close", func(t *testing.T) { | ||
buf := &bytes.Buffer{} | ||
ws, close := Buffer(AddSync(buf), 0, 0) | ||
requireWriteWorks(t, ws) | ||
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.") | ||
close() | ||
assert.Equal(t, "foo", buf.String(), "Unexpected log string") | ||
}) | ||
|
||
t.Run("wrap twice", func(t *testing.T) { | ||
buf := &bytes.Buffer{} | ||
bufsync, close1 := Buffer(AddSync(buf), 0, 0) | ||
ws, close2 := Buffer(bufsync, 0, 0) | ||
requireWriteWorks(t, ws) | ||
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") | ||
close2() | ||
close1() | ||
assert.Equal(t, "foo", buf.String(), "Unexpected log string") | ||
}) | ||
|
||
t.Run("small buffer", func(t *testing.T) { | ||
buf := &bytes.Buffer{} | ||
ws, close := Buffer(AddSync(buf), 5, 0) | ||
defer close() | ||
requireWriteWorks(t, ws) | ||
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.") | ||
requireWriteWorks(t, ws) | ||
assert.Equal(t, "foo", buf.String(), "Unexpected log string") | ||
}) | ||
|
||
t.Run("flush error", func(t *testing.T) { | ||
ws, close := Buffer(AddSync(&errorWriter{}), 4, time.Nanosecond) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why such a short flush timer here? we're relying on the buffer size limit to flush rather than the background timer I think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Absolutely, we can definitely set this timer to 0. |
||
n, err := ws.Write([]byte("foo")) | ||
require.NoError(t, err, "Unexpected error writing to WriteSyncer.") | ||
require.Equal(t, 3, n, "Wrote an unexpected number of bytes.") | ||
ws.Write([]byte("foo")) | ||
assert.Error(t, close(), "Expected close to fail.") | ||
}) | ||
|
||
t.Run("flush timer", func(t *testing.T) { | ||
buf := &bytes.Buffer{} | ||
ws, close := Buffer(AddSync(buf), 6, time.Microsecond) | ||
defer close() | ||
requireWriteWorks(t, ws) | ||
ztest.Sleep(10 * time.Millisecond) | ||
bws := ws.(*bufferWriterSyncer) | ||
bws.Lock() | ||
assert.Equal(t, "foo", buf.String(), "Unexpected log string") | ||
bws.Unlock() | ||
|
||
// flush twice to validate loop logic | ||
requireWriteWorks(t, ws) | ||
ztest.Sleep(10 * time.Millisecond) | ||
bws.Lock() | ||
assert.Equal(t, "foofoo", buf.String(), "Unexpected log string") | ||
bws.Unlock() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is the lock necessary here? there should be nothing to flush, so the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First, bytes.Buffer is not goroutine safe. And we want to avoid test case fail, so add lock in this test case just works fine :-)
|
||
}) | ||
} | ||
|
||
func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) { | ||
w := &ztest.Buffer{} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we move this whole type and methods into a separate file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file already contains Lock syncer and MultiWrite syncer, I think it is a proper place for Buffer syncer :-)