Skip to content

Commit

Permalink
Merge pull request #448 from kzys/buf-reset
Browse files Browse the repository at this point in the history
Reset bytes.Buffer before returning it to sync.Pool
  • Loading branch information
ktock committed Sep 8, 2021
2 parents be290fc + 0294e15 commit 1d81483
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
11 changes: 8 additions & 3 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
}
dataCache = lrucache.New(maxEntry)
dataCache.OnEvicted = func(key string, value interface{}) {
value.(*bytes.Buffer).Reset()
bufPool.Put(value)
}
}
Expand Down Expand Up @@ -296,7 +297,6 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
}

b := dc.bufPool.Get().(*bytes.Buffer)
b.Reset()
memW := &writer{
WriteCloser: nopWriteCloser(io.Writer(b)),
commitFunc: func() error {
Expand All @@ -306,7 +306,7 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
}
cached, done, added := dc.cache.Add(key, b)
if !added {
dc.bufPool.Put(b) // already exists in the cache. abort it.
dc.putBuffer(b) // already exists in the cache. abort it.
}
commit := func() error {
defer done()
Expand All @@ -331,14 +331,19 @@ func (dc *directoryCache) Add(key string, opts ...Option) (Writer, error) {
abortFunc: func() error {
defer w.Close()
defer w.Abort()
dc.bufPool.Put(b) // abort it.
dc.putBuffer(b) // abort it.
return nil
},
}

return memW, nil
}

func (dc *directoryCache) putBuffer(b *bytes.Buffer) {
b.Reset()
dc.bufPool.Put(b)
}

func (dc *directoryCache) Close() error {
dc.closedMu.Lock()
defer dc.closedMu.Unlock()
Expand Down
1 change: 1 addition & 0 deletions fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func newCache(root string, cacheType string, cfg config.Config) (cache.BlobCache
}
dCache, fCache := lrucache.New(maxDataEntry), lrucache.New(maxFdEntry)
dCache.OnEvicted = func(key string, value interface{}) {
value.(*bytes.Buffer).Reset()
bufPool.Put(value)
}
fCache.OnEvicted = func(key string, value interface{}) {
Expand Down
12 changes: 8 additions & 4 deletions fs/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ func (gr *reader) cacheWithReader(ctx context.Context, currentDepth int, eg *err
return
}

func (gr *reader) putBuffer(b *bytes.Buffer) {
b.Reset()
gr.bufPool.Put(b)
}

type file struct {
name string
digest string
Expand Down Expand Up @@ -406,11 +411,10 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) {

// Use temporally buffer for aligning this chunk
b := sf.gr.bufPool.Get().(*bytes.Buffer)
b.Reset()
b.Grow(int(ce.ChunkSize))
ip := b.Bytes()[:ce.ChunkSize]
if _, err := sf.ra.ReadAt(ip, ce.ChunkOffset); err != nil && err != io.EOF {
sf.gr.bufPool.Put(b)
sf.gr.putBuffer(b)
return 0, errors.Wrap(err, "failed to read data")
}

Expand All @@ -421,7 +425,7 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) {

// Verify this chunk
if err := sf.verify(ip, ce); err != nil {
sf.gr.bufPool.Put(b)
sf.gr.putBuffer(b)
return 0, errors.Wrap(err, "invalid chunk")
}

Expand All @@ -435,7 +439,7 @@ func (sf *file) ReadAt(p []byte, offset int64) (int, error) {
w.Close()
}
n := copy(p[nr:], ip[lowerDiscard:ce.ChunkSize-upperDiscard])
sf.gr.bufPool.Put(b)
sf.gr.putBuffer(b)
if int64(n) != expectedSize {
return 0, fmt.Errorf("unexpected final data size %d; want %d", n, expectedSize)
}
Expand Down

0 comments on commit 1d81483

Please sign in to comment.