Skip to content

Commit

Permalink
Add buffered write syncer
Browse files Browse the repository at this point in the history
  • Loading branch information
moisesvega committed May 25, 2021
1 parent 7871b42 commit fc8195f
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 255 deletions.
10 changes: 0 additions & 10 deletions go.mod
Expand Up @@ -5,20 +5,10 @@ go 1.13
require (
github.com/benbjohnson/clock v1.1.0
github.com/pkg/errors v0.8.1
<<<<<<< HEAD
github.com/stretchr/testify v1.7.0
go.uber.org/atomic v1.7.0
go.uber.org/goleak v1.1.10
go.uber.org/multierr v1.6.0
gopkg.in/yaml.v2 v2.2.8
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
=======
github.com/stretchr/testify v1.4.0
go.uber.org/atomic v1.6.0
go.uber.org/goleak v1.1.10
go.uber.org/multierr v1.5.0
golang.org/x/lint v0.0.0-20190930215403-16217165b5de
gopkg.in/yaml.v2 v2.2.2
honnef.co/go/tools v0.0.1-2019.2.3
>>>>>>> 761c8e8 (improve close logic and use goleak)
)
19 changes: 0 additions & 19 deletions go.sum
Expand Up @@ -15,7 +15,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
<<<<<<< HEAD
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
Expand All @@ -24,16 +23,6 @@ go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
=======
go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
>>>>>>> 761c8e8 (improve close logic and use goleak)
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
Expand All @@ -43,14 +32,6 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
<<<<<<< HEAD
=======
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c h1:IGkKhmfzcztjm6gYkykvu/NiS8kaqbCWAEWWAyf8J5U=
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
>>>>>>> 761c8e8 (improve close logic and use goleak)
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
26 changes: 0 additions & 26 deletions internal/ztest/writer.go
Expand Up @@ -25,7 +25,6 @@ import (
"errors"
"io/ioutil"
"strings"
"sync"
)

// A Syncer is a spy for the Sync portion of zapcore.WriteSyncer.
Expand Down Expand Up @@ -95,28 +94,3 @@ func (b *Buffer) Lines() []string {
func (b *Buffer) Stripped() string {
return strings.TrimRight(b.String(), "\n")
}

// SyncBuffer is an implementation of bytes.Buffer which is goroutine safe.
type SyncBuffer struct {
sync.RWMutex

buf bytes.Buffer
}

// Write appends the contents of p to the buffer, growing the buffer as
// needed.
func (b *SyncBuffer) Write(p []byte) (n int, err error) {
b.Lock()
defer b.Unlock()

return b.buf.Write(p)
}

// String returns the contents of the unread portion of the buffer
// as a string.
func (b *SyncBuffer) String() string {
b.RLock()
defer b.RUnlock()

return b.buf.String()
}
131 changes: 131 additions & 0 deletions zapcore/buffered_write_syncer.go
@@ -0,0 +1,131 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"bufio"
"sync"
"time"
)

// A BufferedWriteSyncer is a WriteSyncer that can also flush any buffered data
// with the ability to change the buffer size, flush interval and Clock.
// The default values are; Size 256kb, FlushInterval 30s.
type BufferedWriteSyncer struct {
WriteSyncer

Size int
FlushInterval time.Duration
Clock Clock

// unexported fields for state
mu sync.Mutex
writer *bufio.Writer
ticker *time.Ticker
stop chan struct{}
initialized bool
}

const (
// _defaultBufferSize specifies the default size used by Buffer.
_defaultBufferSize = 256 * 1024 // 256 kB

// _defaultFlushInterval specifies the default flush interval for
// Buffer.
_defaultFlushInterval = 30 * time.Second
)

func (s *BufferedWriteSyncer) loadConfig() {
size := s.Size
if size == 0 {
size = _defaultBufferSize
}

flushInterval := s.FlushInterval
if flushInterval == 0 {
flushInterval = _defaultFlushInterval
}

if s.Clock != nil {
s.ticker = s.Clock.NewTicker(flushInterval)
} else {
s.ticker = DefaultClock.NewTicker(flushInterval)
}

s.writer = bufio.NewWriterSize(s.WriteSyncer, size)
s.stop = make(chan struct{})
s.initialized = true
go s.flushLoop()
}

// Write writes log data into buffer syncer directly, multiple Write calls will be batched,
// and log data will be flushed to disk when the buffer is full or periodically.
func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()

if !s.initialized {
s.loadConfig()
}

// To avoid partial writes from being flushed, we manually flush the existing buffer if:
// * The current write doesn't fit into the buffer fully, and
// * The buffer is not empty (since bufio will not split large writes when the buffer is empty)
if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 {
if err := s.writer.Flush(); err != nil {
return 0, err
}
}

return s.writer.Write(bs)
}

// Sync flushes buffered log data into disk directly.
func (s *BufferedWriteSyncer) Sync() error {
s.mu.Lock()
defer s.mu.Unlock()

return s.writer.Flush()
}

// flushLoop flushes the buffer at the configured interval until Close is
// called.
func (s *BufferedWriteSyncer) flushLoop() {
for {
select {
case <-s.ticker.C:
// we just simply ignore error here
// because the underlying bufio writer stores any errors
// and we return any error from Sync() as part of the close
_ = s.Sync()
case <-s.stop:
return
}
}
}

// Close closes the buffer, cleans up background goroutines, and flushes
// remaining, unwritten data.
func (s *BufferedWriteSyncer) Close() error {
s.ticker.Stop()
close(s.stop)
return s.Sync()
}
49 changes: 49 additions & 0 deletions zapcore/buffered_write_syncer_bench_test.go
@@ -0,0 +1,49 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func BenchmarkBufferedWriteSyncer(b *testing.B) {
b.Run("write file with buffer", func(b *testing.B) {
file, err := ioutil.TempFile("", "log")
assert.NoError(b, err)
defer file.Close()
defer os.Remove(file.Name())

w := &BufferedWriteSyncer{
WriteSyncer: AddSync(file),
}
defer w.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}
106 changes: 106 additions & 0 deletions zapcore/buffered_write_syncer_test.go
@@ -0,0 +1,106 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"bytes"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/internal/ztest"
)

func TestBufferWriter(t *testing.T) {
// If we pass a plain io.Writer, make sure that we still get a WriteSyncer
// with a no-op Sync.
t.Run("sync", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)}

requireWriteWorks(t, ws)
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.")
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
assert.NoError(t, ws.Close())
})

t.Run("close", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)}
requireWriteWorks(t, ws)
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Close())
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("wrap twice", func(t *testing.T) {
buf := &bytes.Buffer{}
bufsync := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)}
ws := &BufferedWriteSyncer{WriteSyncer: bufsync}
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Close())
assert.NoError(t, bufsync.Close())
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("small buffer", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf), Size: 5}

requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
requireWriteWorks(t, ws)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
assert.NoError(t, ws.Close())
})

t.Run("flush error", func(t *testing.T) {
ws := &BufferedWriteSyncer{WriteSyncer: &ztest.FailWriter{}, Size: 4}
n, err := ws.Write([]byte("foo"))
require.NoError(t, err, "Unexpected error writing to WriteSyncer.")
require.Equal(t, 3, n, "Wrote an unexpected number of bytes.")
ws.Write([]byte("foo"))
assert.Error(t, ws.Close(), "Expected close to fail.")
})

t.Run("flush timer", func(t *testing.T) {
buf := &bytes.Buffer{}
clock := newControlledClock()
ws := &BufferedWriteSyncer{
WriteSyncer: AddSync(buf),
Size: 6,
FlushInterval: time.Microsecond,
Clock: clock,
}
requireWriteWorks(t, ws)
clock.Add(10 * time.Millisecond)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")

// flush twice to validate loop logic
requireWriteWorks(t, ws)
clock.Add(10 * time.Millisecond)
assert.Equal(t, "foofoo", buf.String(), "Unexpected log string")
assert.NoError(t, ws.Close())
})
}

0 comments on commit fc8195f

Please sign in to comment.