Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BufferedWriteSyncer #1

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions go.sum
Expand Up @@ -3,8 +3,11 @@ github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZx
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand All @@ -22,20 +25,27 @@ go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11 h1:Yq9t9jnGoR+dBuitxdo9l6Q7xh/zOyNnYUtDKaQ3x0E=
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
Expand Down
129 changes: 129 additions & 0 deletions zapcore/buffered_write_syncer.go
@@ -0,0 +1,129 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"bufio"
"sync"
"time"
)

type BufferedWriteSyncer struct {
WriteSyncer
sync.Mutex
Comment on lines +30 to +31
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, anything you embed is part of the public API. That's probably not desirable for the mutex, but we can do it for the WriteSyncer if we want to.

https://github.com/uber-go/guide/blob/master/style.md#avoid-embedding-types-in-public-structs


Size int
FlushInterval time.Duration
Clock Clock

// unexported fields for state
size int
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused field. can delete.

writer *bufio.Writer
ticker *time.Ticker
stop chan struct{}
initialized bool
}

const (
// _defaultBufferSize specifies the default size used by Buffer.
_defaultBufferSize = 256 * 1024 // 256 kB

// _defaultFlushInterval specifies the default flush interval for
// Buffer.
_defaultFlushInterval = 30 * time.Second
)

func (s *BufferedWriteSyncer) init() {
size := s.Size
if size == 0 {
size = _defaultBufferSize
}

flushInterval := s.FlushInterval
if flushInterval == 0 {
flushInterval = _defaultFlushInterval
}

if s.Clock != nil {
s.ticker = s.Clock.NewTicker(flushInterval)
} else {
s.ticker = time.NewTicker(flushInterval)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use a default clock implementation if unset rather than if-else on using the clock versus using the system.

So the systemClock should be available in zapcore to be used.

}

s.writer = bufio.NewWriterSize(s.WriteSyncer, size)
s.stop = make(chan struct{})
s.initialized = true
go s.flushLoop()
}

// Write writes log data into buffer syncer directly, multiple Write calls will be batched,
// and log data will be flushed to disk when the buffer is full or periodically.
func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) {
if !s.initialized {
s.init()
}

s.Lock()
defer s.Unlock()

Comment on lines +84 to +86
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're trying to make Write thread-safe, the initiallized check should also be thread safe.
The lock should be acquired before we change initialized.

// To avoid partial writes from being flushed, we manually flush the existing buffer if:
// * The current write doesn't fit into the buffer fully, and
// * The buffer is not empty (since bufio will not split large writes when the buffer is empty)
if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 {
if err := s.writer.Flush(); err != nil {
return 0, err
}
}

return s.writer.Write(bs)
}

// Sync flushes buffered log data into disk directly.
func (s *BufferedWriteSyncer) Sync() error {
s.Lock()
defer s.Unlock()

return s.writer.Flush()
}

// flushLoop flushes the buffer at the configured interval until Close is
// called.
func (s *BufferedWriteSyncer) flushLoop() {
for {
select {
case <-s.ticker.C:
// we just simply ignore error here
// because the underlying bufio writer stores any errors
// and we return any error from Sync() as part of the close
_ = s.Sync()
case <-s.stop:
return
}
}
}

// Close closes the buffer, cleans up background goroutines, and flushes
// remaining, unwritten data.
func (s *BufferedWriteSyncer) Close() error {
s.ticker.Stop()
close(s.stop)
return s.Sync()
}
49 changes: 49 additions & 0 deletions zapcore/buffered_write_syncer_bench_test.go
@@ -0,0 +1,49 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func BenchmarkBufferedWriteSyncer(b *testing.B) {
b.Run("write file with buffer", func(b *testing.B) {
file, err := ioutil.TempFile("", "log")
assert.NoError(b, err)
defer file.Close()
defer os.Remove(file.Name())

w := &BufferedWriteSyncer{
WriteSyncer: AddSync(file),
}
defer w.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}
109 changes: 109 additions & 0 deletions zapcore/buffered_write_syncer_test.go
@@ -0,0 +1,109 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"bytes"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"go.uber.org/zap/internal/ztest"
)

func TestBufferWriter(t *testing.T) {
defer goleak.VerifyNone(t)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not necessary if leak_test provides this via TestMain


// If we pass a plain io.Writer, make sure that we still get a WriteSyncer
// with a no-op Sync.
t.Run("sync", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)}

requireWriteWorks(t, ws)
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.")
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
assert.NoError(t, ws.Close())
})

t.Run("close", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)}
requireWriteWorks(t, ws)
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Close())
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("wrap twice", func(t *testing.T) {
buf := &bytes.Buffer{}
bufsync := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)}
ws := &BufferedWriteSyncer{WriteSyncer: bufsync}
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Close())
assert.NoError(t, bufsync.Close())
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("small buffer", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf), Size: 5}

requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
requireWriteWorks(t, ws)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
assert.NoError(t, ws.Close())
})

t.Run("flush error", func(t *testing.T) {
ws := &BufferedWriteSyncer{WriteSyncer: &ztest.FailWriter{}, Size: 4}
n, err := ws.Write([]byte("foo"))
require.NoError(t, err, "Unexpected error writing to WriteSyncer.")
require.Equal(t, 3, n, "Wrote an unexpected number of bytes.")
ws.Write([]byte("foo"))
assert.Error(t, ws.Close(), "Expected close to fail.")
})

t.Run("flush timer", func(t *testing.T) {
buf := &bytes.Buffer{}
clock := newControlledClock()
ws := &BufferedWriteSyncer{
WriteSyncer: AddSync(buf),
Size: 6,
FlushInterval: time.Microsecond,
Clock: clock,
}
requireWriteWorks(t, ws)
clock.Add(10 * time.Millisecond)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")

// flush twice to validate loop logic
requireWriteWorks(t, ws)
clock.Add(10 * time.Millisecond)
assert.Equal(t, "foofoo", buf.String(), "Unexpected log string")
assert.NoError(t, ws.Close())
})
}
41 changes: 39 additions & 2 deletions zapcore/write_syncer_bench_test.go
Expand Up @@ -21,13 +21,16 @@
package zapcore

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap/internal/ztest"
)

func BenchmarkMultiWriteSyncer(b *testing.B) {
b.Run("2", func(b *testing.B) {
b.Run("2 discarder", func(b *testing.B) {
w := NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
Expand All @@ -39,7 +42,7 @@ func BenchmarkMultiWriteSyncer(b *testing.B) {
}
})
})
b.Run("4", func(b *testing.B) {
b.Run("4 discarder", func(b *testing.B) {
w := NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
Expand All @@ -53,4 +56,38 @@ func BenchmarkMultiWriteSyncer(b *testing.B) {
}
})
})
b.Run("4 discarder with buffer", func(b *testing.B) {
w := &BufferedWriteSyncer{
WriteSyncer: NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
&ztest.Discarder{},
&ztest.Discarder{},
),
}
defer w.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}

func BenchmarkWriteSyncer(b *testing.B) {
b.Run("write file with no buffer", func(b *testing.B) {
file, err := ioutil.TempFile("", "log")
assert.NoError(b, err)
defer file.Close()
defer os.Remove(file.Name())

w := AddSync(file)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}