Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add buffered write syncer #952

Merged
merged 65 commits into from Jun 8, 2021
Merged
Show file tree
Hide file tree
Changes from 59 commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
c1e68d7
add buffer sync
sysulq Feb 8, 2020
df42521
support config bufferSize and flushInterval, improve logic
sysulq Feb 11, 2020
7f5e097
improve
sysulq Feb 13, 2020
7eec047
update comment
sysulq Feb 13, 2020
3c8ec7b
WriterSyncer support Close method
sysulq Feb 25, 2020
046355f
improve
sysulq Feb 25, 2020
afe872f
fix spell
sysulq Feb 25, 2020
cce50b1
improve cancel logic
sysulq Mar 3, 2020
ef808a2
improve
sysulq Mar 3, 2020
3af046a
remove close
sysulq Mar 3, 2020
e0a440d
rename cancel to close and keep syncing
sysulq May 29, 2020
e571e25
fix lint
sysulq Jun 1, 2020
4a25c9f
100% test coverage
sysulq Jun 1, 2020
99f4ea8
improve comment
sysulq Jun 1, 2020
223dd97
improve comment
sysulq Jun 1, 2020
dd3698e
fix test error
sysulq Jun 1, 2020
f3079de
fix race condition in test case
sysulq Jun 1, 2020
2c09dc4
100% test coverage
sysulq Jun 1, 2020
f0f2a28
add loop and fix typo
sysulq Jun 5, 2020
fb6efc3
Update zapcore/write_syncer.go
sysulq Jun 9, 2020
0977e02
improve test
sysulq Jun 9, 2020
65775e1
validate loop logic in test case
sysulq Jun 9, 2020
470b7fa
group default config
sysulq Jun 9, 2020
160e84d
improve close logic
sysulq Jun 23, 2020
ee04403
Update zapcore/write_syncer.go
sysulq Oct 9, 2020
80433c2
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
9bda819
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
dc370da
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
2ecf1f8
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
a522baf
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
4273b8e
Update zapcore/write_syncer_test.go
sysulq Oct 9, 2020
d884534
improve close logic and use goleak
sysulq Oct 13, 2020
0392f30
Drop `type CloseFunc`
abhinav Nov 2, 2020
ddace4d
doc: Rewrite Buffer documentation
abhinav Nov 2, 2020
e96c0ee
Prefix default{BufferSize, FlushInterval} with _
abhinav Nov 2, 2020
570d2ff
Buffer/close: return a bound method
abhinav Nov 2, 2020
d1e244a
Buffer/close: Close the channel instead of posting
abhinav Nov 2, 2020
83f6331
_default{BufferSize, FlushInterval}: docs
abhinav Nov 2, 2020
ba0b65e
buffer: Move loop into a method
abhinav Nov 2, 2020
b8a0b28
buffer/close: stop the ticker
abhinav Nov 2, 2020
1c5daa9
Update zapcore/write_syncer_test.go
sysulq Nov 6, 2020
f74c558
improve code style
sysulq Nov 6, 2020
6f88240
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
963142b
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
f753926
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
1db4692
add comment
sysulq Nov 6, 2020
545d164
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
c777047
Update zapcore/write_syncer.go
sysulq Nov 6, 2020
1b0fcfd
update
sysulq Nov 6, 2020
02c9d3f
Remove Lock/Unlock from tests
prashantv Jan 5, 2021
4804188
remove double buffer check
sysulq Jan 6, 2021
4ca46e9
set timer to zero
sysulq Jan 6, 2021
b1a95ff
remove errorWriter
sysulq Jan 6, 2021
6d58a7d
add SyncBuffer
sysulq Jan 6, 2021
2312b39
Add buffered write syncer
moisesvega May 25, 2021
7c32a14
Set default Clock zapcore/buffered_write_syncer.go
moisesvega Jun 1, 2021
ffba68b
Use require instead assert zapcore/buffered_write_syncer_bench_test.go
moisesvega Jun 1, 2021
a3fbb8e
Update Close() comment, use filepath to create tmpDir and mutate Cloc…
moisesvega Jun 1, 2021
39f7316
Use ioutil for creating tmp file
moisesvega Jun 1, 2021
10b45db
Sync underlying writeSyncer
moisesvega Jun 4, 2021
0962b0f
Update default documentation and don't double lock
moisesvega Jun 4, 2021
7dedbc0
Add test case with lockedWriteSyncer
moisesvega Jun 4, 2021
f8f3a3d
Use empty assert.Empty
moisesvega Jun 8, 2021
acfd294
Use zapcore.Lock for BufferedWriteSyncer tests
moisesvega Jun 8, 2021
f32b796
Add asserts on close/remove in benchmark buffered write and add test …
moisesvega Jun 8, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
130 changes: 130 additions & 0 deletions zapcore/buffered_write_syncer.go
@@ -0,0 +1,130 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"bufio"
"sync"
"time"
)

// A BufferedWriteSyncer is a WriteSyncer that can also flush any buffered data
// with the ability to change the buffer size, flush interval and Clock.
moisesvega marked this conversation as resolved.
Show resolved Hide resolved
// The default values are; Size 256kb, FlushInterval 30s.
type BufferedWriteSyncer struct {
WriteSyncer

Size int
FlushInterval time.Duration
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: we should specify the defaults above the fields as we document the fields:

// Size specifies the maximum amount of data the writer will buffer before
// flushing. Defaults to 256 kB.

// FlushInterval specifies how often the writer should flush data if there
// have been no writes. Defaults to 30 seconds.

Clock Clock
moisesvega marked this conversation as resolved.
Show resolved Hide resolved

// unexported fields for state
mu sync.Mutex
writer *bufio.Writer
ticker *time.Ticker
stop chan struct{}
initialized bool
}

const (
// _defaultBufferSize specifies the default size used by Buffer.
_defaultBufferSize = 256 * 1024 // 256 kB

// _defaultFlushInterval specifies the default flush interval for
// Buffer.
_defaultFlushInterval = 30 * time.Second
)

func (s *BufferedWriteSyncer) loadConfig() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor naming: can we name this init or initialize because the bool is called initialize. Or we can name the bool "loaded" and call this "load", but I'm more in favor of init or initialize.

size := s.Size
if size == 0 {
size = _defaultBufferSize
}

flushInterval := s.FlushInterval
if flushInterval == 0 {
flushInterval = _defaultFlushInterval
}

if s.Clock == nil {
s.Clock = DefaultClock
}
s.ticker = s.Clock.NewTicker(flushInterval)

s.writer = bufio.NewWriterSize(s.WriteSyncer, size)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor optimization on the WriteSyncer:
since BufferedWriteSyncer is already thread-safe, we can optimize this a bit if s.WriteSyncer was wrapped with zapcore.Lock.

writer := s.WriteSyncer
if w, ok := writer.(*lockedWriteSyncer); ok {
  writer = w.ws // don't double lock
}
s.writer = bufio.NewWriterSize(writer, size)

s.stop = make(chan struct{})
s.initialized = true
go s.flushLoop()
abhinav marked this conversation as resolved.
Show resolved Hide resolved
}

// Write writes log data into buffer syncer directly, multiple Write calls will be batched,
// and log data will be flushed to disk when the buffer is full or periodically.
func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()

if !s.initialized {
s.loadConfig()
}

// To avoid partial writes from being flushed, we manually flush the existing buffer if:
// * The current write doesn't fit into the buffer fully, and
// * The buffer is not empty (since bufio will not split large writes when the buffer is empty)
if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 {
if err := s.writer.Flush(); err != nil {
return 0, err
}
}

return s.writer.Write(bs)
}

// Sync flushes buffered log data into disk directly.
func (s *BufferedWriteSyncer) Sync() error {
moisesvega marked this conversation as resolved.
Show resolved Hide resolved
s.mu.Lock()
defer s.mu.Unlock()

return s.writer.Flush()
}

// flushLoop flushes the buffer at the configured interval until Close is
// called.
func (s *BufferedWriteSyncer) flushLoop() {
for {
select {
case <-s.ticker.C:
// we just simply ignore error here
// because the underlying bufio writer stores any errors
// and we return any error from Sync() as part of the close
_ = s.Sync()
case <-s.stop:
return
}
}
}

// Close closes the buffer, cleans up background goroutines, and flushes
// remaining, unwritten data. This will not close the underlying WriteSyncer.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The warning that this will not close the underlying WriteSyncer makes me wonder if Close is a little surprising. Would this be better named something else like Stop to make it very clear that it doesn't close the underlying writer?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reasonable. Renaming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. Stop makes more sense in the context of this specific writer.

func (s *BufferedWriteSyncer) Close() error {
moisesvega marked this conversation as resolved.
Show resolved Hide resolved
s.ticker.Stop()
close(s.stop)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this will panic if we do it twice, do we want to explicitly call out that Close must only be called once, or put some protection here in case Close is called twice?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going with a doc for now. Guarding against it is possible but seems unnecessary given that standard usage will probably be:

ws := zapcore.BufferedWriteSyncer{...}
defer ws.Stop()

(we do need to guard against Stop without Start, so going to add that too.)

return s.Sync()
Comment on lines +145 to +146
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we wait for the background goroutine to stop before returning here? It leads to:

  • no goroutine running after Close guaranteed
  • avoids odd cases where a Sync can happen after the Close returns

}
49 changes: 49 additions & 0 deletions zapcore/buffered_write_syncer_bench_test.go
@@ -0,0 +1,49 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/require"
)

func BenchmarkBufferedWriteSyncer(b *testing.B) {
b.Run("write file with buffer", func(b *testing.B) {
file, err := ioutil.TempFile("", "log")
require.NoError(b, err)
shirchen marked this conversation as resolved.
Show resolved Hide resolved
defer file.Close()
shirchen marked this conversation as resolved.
Show resolved Hide resolved
defer os.Remove(file.Name())

w := &BufferedWriteSyncer{
WriteSyncer: AddSync(file),
}
defer w.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}
106 changes: 106 additions & 0 deletions zapcore/buffered_write_syncer_test.go
@@ -0,0 +1,106 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package zapcore

import (
"bytes"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/internal/ztest"
)

func TestBufferWriter(t *testing.T) {
// If we pass a plain io.Writer, make sure that we still get a WriteSyncer
// with a no-op Sync.
t.Run("sync", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)}

requireWriteWorks(t, ws)
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Sync(), "Unexpected error calling a no-op Sync method.")
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
assert.NoError(t, ws.Close())
})

t.Run("close", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)}
requireWriteWorks(t, ws)
assert.Empty(t, buf.String(), "Unexpected log calling a no-op Write method.")
assert.NoError(t, ws.Close())
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
})

t.Run("wrap twice", func(t *testing.T) {
buf := &bytes.Buffer{}
bufsync := &BufferedWriteSyncer{WriteSyncer: AddSync(buf)}
ws := &BufferedWriteSyncer{WriteSyncer: bufsync}
requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
moisesvega marked this conversation as resolved.
Show resolved Hide resolved
assert.NoError(t, ws.Close())
assert.NoError(t, bufsync.Close())
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
moisesvega marked this conversation as resolved.
Show resolved Hide resolved
})

t.Run("small buffer", func(t *testing.T) {
buf := &bytes.Buffer{}
ws := &BufferedWriteSyncer{WriteSyncer: AddSync(buf), Size: 5}

requireWriteWorks(t, ws)
assert.Equal(t, "", buf.String(), "Unexpected log calling a no-op Write method.")
requireWriteWorks(t, ws)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")
assert.NoError(t, ws.Close())
})

t.Run("flush error", func(t *testing.T) {
ws := &BufferedWriteSyncer{WriteSyncer: &ztest.FailWriter{}, Size: 4}
n, err := ws.Write([]byte("foo"))
require.NoError(t, err, "Unexpected error writing to WriteSyncer.")
require.Equal(t, 3, n, "Wrote an unexpected number of bytes.")
ws.Write([]byte("foo"))
assert.Error(t, ws.Close(), "Expected close to fail.")
})

t.Run("flush timer", func(t *testing.T) {
buf := &bytes.Buffer{}
clock := newControlledClock()
ws := &BufferedWriteSyncer{
WriteSyncer: AddSync(buf),
Size: 6,
FlushInterval: time.Microsecond,
Clock: clock,
}
requireWriteWorks(t, ws)
clock.Add(10 * time.Millisecond)
assert.Equal(t, "foo", buf.String(), "Unexpected log string")

// flush twice to validate loop logic
requireWriteWorks(t, ws)
clock.Add(10 * time.Millisecond)
assert.Equal(t, "foofoo", buf.String(), "Unexpected log string")
assert.NoError(t, ws.Close())
})
}
41 changes: 39 additions & 2 deletions zapcore/write_syncer_bench_test.go
Expand Up @@ -21,13 +21,16 @@
package zapcore

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap/internal/ztest"
)

func BenchmarkMultiWriteSyncer(b *testing.B) {
b.Run("2", func(b *testing.B) {
b.Run("2 discarder", func(b *testing.B) {
w := NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
Expand All @@ -39,7 +42,7 @@ func BenchmarkMultiWriteSyncer(b *testing.B) {
}
})
})
b.Run("4", func(b *testing.B) {
b.Run("4 discarder", func(b *testing.B) {
shirchen marked this conversation as resolved.
Show resolved Hide resolved
w := NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
Expand All @@ -53,4 +56,38 @@ func BenchmarkMultiWriteSyncer(b *testing.B) {
}
})
})
b.Run("4 discarder with buffer", func(b *testing.B) {
w := &BufferedWriteSyncer{
WriteSyncer: NewMultiWriteSyncer(
&ztest.Discarder{},
&ztest.Discarder{},
&ztest.Discarder{},
&ztest.Discarder{},
),
}
defer w.Close()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}

func BenchmarkWriteSyncer(b *testing.B) {
b.Run("write file with no buffer", func(b *testing.B) {
file, err := ioutil.TempFile("", "log")
assert.NoError(b, err)
shirchen marked this conversation as resolved.
Show resolved Hide resolved
defer file.Close()
defer os.Remove(file.Name())

w := AddSync(file)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
w.Write([]byte("foobarbazbabble"))
}
})
})
}
3 changes: 1 addition & 2 deletions zapcore/write_syncer_test.go
Expand Up @@ -23,9 +23,8 @@ package zapcore
import (
"bytes"
"errors"
"testing"

"io"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down