Skip to content

Commit

Permalink
improve
Browse files Browse the repository at this point in the history
  • Loading branch information
sysulq committed Feb 13, 2020
1 parent eea2b51 commit d00e5bc
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 8 deletions.
15 changes: 15 additions & 0 deletions zapcore/write_syncer.go
Expand Up @@ -91,6 +91,8 @@ const defaultBufferSize = 256 * 1024
const defaultFlushInterval = 30 * time.Second

// Buffer wraps a WriteSyncer in a buffer to improve performance,
// if bufferSize=0, we set it to defaultBufferSize
// if flushInterval=0, we set it to defaultFlushInterval
func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSyncer {
if lws, ok := ws.(*lockedWriteSyncer); ok {
if _, ok := lws.ws.(*bufferWriterSyncer); ok {
Expand Down Expand Up @@ -126,6 +128,19 @@ func Buffer(ws WriteSyncer, bufferSize int, flushInterval time.Duration) WriteSy
}

func (s *bufferWriterSyncer) Write(bs []byte) (int, error) {

// there are some logic internal for bufio.Writer here:
// 1. when the buffer is not enough, log would be written to disk directly
// 2. when the buffer is enough, log would not be flushed until the buffer is filled up
// this would lead to log spliting, which is not acceptable for log collector
// so we need to flush bufferWriter before writing the log into bufferWriter
if len(bs) > s.bufferWriter.Available() && s.bufferWriter.Buffered() > 0 {
err := s.bufferWriter.Flush()
if err != nil {
return 0, err
}
}

return s.bufferWriter.Write(bs)
}

Expand Down
25 changes: 17 additions & 8 deletions zapcore/write_syncer_test.go
Expand Up @@ -24,7 +24,6 @@ import (
"bytes"
"errors"
"testing"
"time"

"io"

Expand Down Expand Up @@ -69,13 +68,23 @@ func TestAddSyncWriter(t *testing.T) {
func TestBufferWriter(t *testing.T) {
// If we pass a plain io.Writer, make sure that we still get a WriteSyncer
// with a no-op Sync.
buf := &bytes.Buffer{}
ws := Buffer(Buffer(AddSync(buf), 0, time.Millisecond), 0, 0)
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
time.Sleep(2 * time.Millisecond)
// assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.")
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
t.Run("default", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := Buffer(Buffer(AddSync(buf), 0, 0), 0, 0)
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.")
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("small buffer", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := Buffer(Buffer(AddSync(buf), 5, 0), 5, 0)
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
requireWriteWorks(t, ws)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})
}

func TestNewMultiWriteSyncerWorksForSingleWriter(t *testing.T) {
Expand Down

0 comments on commit d00e5bc

Please sign in to comment.