Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset bytes.Buffer before returning it to sync.Pool #448

Merged
merged 1 commit into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 8 additions & 3 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
}
dataCache = lrucache.New(maxEntry)
dataCache.OnEvicted = func(key string, value interface{}) {
value.(*bytes.Buffer).Reset()
bufPool.Put(value)
}
}
Expand Down Expand Up @@ -296,7 +297,6 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
}

b := dc.bufPool.Get().(*bytes.Buffer)
b.Reset()
memW := &writer{
WriteCloser: nopWriteCloser(io.Writer(b)),
commitFunc: func() error {
Expand All @@ -306,7 +306,7 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
}
cached, done, added := dc.cache.Add(key, b)
if !added {
dc.bufPool.Put(b) // already exists in the cache. abort it.
dc.putBuffer(b) // already exists in the cache. abort it.
}
commit := func() error {
defer done()
Expand All @@ -331,14 +331,19 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
abortFunc: func() error {
defer w.Close()
defer w.Abort()
dc.bufPool.Put(b) // abort it.
dc.putBuffer(b) // abort it.
return nil
},
}

return memW, nil
}

func (dc *directoryCache) putBuffer(b *bytes.Buffer) {
b.Reset()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that calling Reset() is the safest but may not be the most efficient solution.

Go's standard library has some more sophisticated examples.

https://github.com/golang/go/blob/9e7bc80b31088dc62faf4776ffdb1a2e27afa94e/src/os/dir_unix.go#L30-L36

https://github.com/golang/go/blob/acc2957bc9873ab7e65942045830a3dc0592eda2/src/net/http/server.go#L794-L850

dc.bufPool.Put(b)
}

func (dc *directoryCache) Close() error {
dc.closedMu.Lock()
defer dc.closedMu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func newCache(root string, cacheType string, cfg config.Config) (cache.BlobCache
}
dCache, fCache := lrucache.New(maxDataEntry), lrucache.New(maxFdEntry)
dCache.OnEvicted = func(key string, value interface{}) {
value.(*bytes.Buffer).Reset()
bufPool.Put(value)
}
fCache.OnEvicted = func(key string, value interface{}) {
Expand Down
12 changes: 8 additions & 4 deletions fs/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ func (gr *reader) cacheWithReader(ctx context.Context, currentDepth int, eg *err
return
}

func (gr *reader) putBuffer(b *bytes.Buffer) {
b.Reset()
gr.bufPool.Put(b)
}

type file struct {
name string
digest string
Expand Down Expand Up @@ -406,11 +411,10 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) {

// Use temporally buffer for aligning this chunk
b := sf.gr.bufPool.Get().(*bytes.Buffer)
b.Reset()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remain Reset before use for safety?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but would it have real effects? Get() returns either a new Buffer, which is safe to use according to https://cs.opensource.google/go/go/+/refs/tags/go1.17:src/bytes/buffer.go;l=449

In most cases, new(Buffer) (or just declaring a Buffer variable) is sufficient to initialize a Buffer.

or a buffer which is put by putBuffer().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think you are right. Merging.

b.Grow(int(ce.ChunkSize))
ip := b.Bytes()[:ce.ChunkSize]
if _, err := sf.ra.ReadAt(ip, ce.ChunkOffset); err != nil && err != io.EOF {
sf.gr.bufPool.Put(b)
sf.gr.putBuffer(b)
return 0, errors.Wrap(err, "failed to read data")
}

Expand All @@ -421,7 +425,7 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) {

// Verify this chunk
if err := sf.verify(ip, ce); err != nil {
sf.gr.bufPool.Put(b)
sf.gr.putBuffer(b)
return 0, errors.Wrap(err, "invalid chunk")
}

Expand All @@ -435,7 +439,7 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) {
w.Close()
}
n := copy(p[nr:], ip[lowerDiscard:ce.ChunkSize-upperDiscard])
sf.gr.bufPool.Put(b)
sf.gr.putBuffer(b)
if int64(n) != expectedSize {
return 0, fmt.Errorf("unexpected final data size %d; want %d", n, expectedSize)
}
Expand Down