Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise zlib reader and consolidate sync.Pools #608

Merged
merged 4 commits into from Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
17 changes: 10 additions & 7 deletions plumbing/format/objfile/reader.go
@@ -1,13 +1,13 @@
package objfile

import (
"compress/zlib"
"errors"
"io"
"strconv"

"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/format/packfile"
"github.com/go-git/go-git/v5/utils/sync"
)

var (
Expand All @@ -20,20 +20,22 @@ var (
// Reader implements io.ReadCloser. Close should be called when finished with
// the Reader. Close will not close the underlying io.Reader.
type Reader struct {
multi io.Reader
zlib io.ReadCloser
hasher plumbing.Hasher
multi io.Reader
zlib io.Reader
zlibref sync.ZLibReader
hasher plumbing.Hasher
}

// NewReader returns a new Reader reading from r.
func NewReader(r io.Reader) (*Reader, error) {
zlib, err := zlib.NewReader(r)
zlib, err := sync.GetZlibReader(r)
if err != nil {
return nil, packfile.ErrZLib.AddDetails(err.Error())
}

return &Reader{
zlib: zlib,
zlib: zlib.Reader,
zlibref: zlib,
}, nil
}

Expand Down Expand Up @@ -110,5 +112,6 @@ func (r *Reader) Hash() plumbing.Hash {
// Close releases any resources consumed by the Reader. Calling Close does not
// close the wrapped io.Reader originally passed to NewReader.
func (r *Reader) Close() error {
return r.zlib.Close()
sync.PutZlibReader(r.zlibref)
return nil
}
7 changes: 5 additions & 2 deletions plumbing/format/objfile/writer.go
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"

"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/utils/sync"
)

var (
Expand All @@ -18,9 +19,9 @@ var (
// not close the underlying io.Writer.
type Writer struct {
raw io.Writer
zlib io.WriteCloser
hasher plumbing.Hasher
multi io.Writer
zlib *zlib.Writer

closed bool
pending int64 // number of unwritten bytes
Expand All @@ -31,9 +32,10 @@ type Writer struct {
// The returned Writer implements io.WriteCloser. Close should be called when
// finished with the Writer. Close will not close the underlying io.Writer.
func NewWriter(w io.Writer) *Writer {
zlib := sync.GetZlibWriter(w)
return &Writer{
raw: w,
zlib: zlib.NewWriter(w),
zlib: zlib,
}
}

Expand Down Expand Up @@ -100,6 +102,7 @@ func (w *Writer) Hash() plumbing.Hash {
// Calling Close does not close the wrapped io.Writer originally passed to
// NewWriter.
func (w *Writer) Close() error {
defer sync.PutZlibWriter(w.zlib)
if err := w.zlib.Close(); err != nil {
return err
}
Expand Down
18 changes: 0 additions & 18 deletions plumbing/format/packfile/common.go
@@ -1,10 +1,7 @@
package packfile

import (
"bytes"
"compress/zlib"
"io"
"sync"

"github.com/go-git/go-git/v5/plumbing/storer"
"github.com/go-git/go-git/v5/utils/ioutil"
Expand Down Expand Up @@ -61,18 +58,3 @@ func WritePackfileToObjectStorage(

return err
}

var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}

var zlibInitBytes = []byte{0x78, 0x9c, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01}

var zlibReaderPool = sync.Pool{
New: func() interface{} {
r, _ := zlib.NewReader(bytes.NewReader(zlibInitBytes))
return r
},
}
21 changes: 9 additions & 12 deletions plumbing/format/packfile/diff_delta.go
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/utils/ioutil"
"github.com/go-git/go-git/v5/utils/sync"
)

// See https://github.com/jelmer/dulwich/blob/master/dulwich/pack.py and
Expand Down Expand Up @@ -43,18 +44,16 @@ func getDelta(index *deltaIndex, base, target plumbing.EncodedObject) (o plumbin

defer ioutil.CheckClose(tr, &err)

bb := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(bb)
bb.Reset()
bb := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(bb)

_, err = bb.ReadFrom(br)
if err != nil {
return nil, err
}

tb := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(tb)
tb.Reset()
tb := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(tb)

_, err = tb.ReadFrom(tr)
if err != nil {
Expand All @@ -80,19 +79,17 @@ func DiffDelta(src, tgt []byte) []byte {
}

func diffDelta(index *deltaIndex, src []byte, tgt []byte) []byte {
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)
buf.Write(deltaEncodeSize(len(src)))
buf.Write(deltaEncodeSize(len(tgt)))

if len(index.entries) == 0 {
index.init(src)
}

ibuf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(ibuf)
ibuf.Reset()
ibuf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(ibuf)
for i := 0; i < len(tgt); i++ {
offset, l := index.findMatch(src, tgt, i)

Expand Down
34 changes: 16 additions & 18 deletions plumbing/format/packfile/packfile.go
Expand Up @@ -2,7 +2,6 @@ package packfile

import (
"bytes"
"compress/zlib"
"fmt"
"io"
"os"
Expand All @@ -13,6 +12,7 @@ import (
"github.com/go-git/go-git/v5/plumbing/format/idxfile"
"github.com/go-git/go-git/v5/plumbing/storer"
"github.com/go-git/go-git/v5/utils/ioutil"
"github.com/go-git/go-git/v5/utils/sync"
)

var (
Expand Down Expand Up @@ -138,9 +138,8 @@ func (p *Packfile) getObjectSize(h *ObjectHeader) (int64, error) {
case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject:
return h.Length, nil
case plumbing.REFDeltaObject, plumbing.OFSDeltaObject:
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

if _, _, err := p.s.NextObject(buf); err != nil {
return 0, err
Expand Down Expand Up @@ -227,9 +226,9 @@ func (p *Packfile) getNextObject(h *ObjectHeader, hash plumbing.Hash) (plumbing.
// For delta objects we read the delta data and apply the small object
// optimization only if the expanded version of the object still meets
// the small object threshold condition.
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

if _, _, err := p.s.NextObject(buf); err != nil {
return nil, err
}
Expand Down Expand Up @@ -290,14 +289,13 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) {

func asyncReader(p *Packfile) (io.ReadCloser, error) {
reader := ioutil.NewReaderUsingReaderAt(p.file, p.s.r.offset)
zr := zlibReaderPool.Get().(io.ReadCloser)

if err := zr.(zlib.Resetter).Reset(reader, nil); err != nil {
zr, err := sync.GetZlibReader(reader)
if err != nil {
return nil, fmt.Errorf("zlib reset error: %s", err)
}

return ioutil.NewReadCloserWithCloser(zr, func() error {
zlibReaderPool.Put(zr)
return ioutil.NewReadCloserWithCloser(zr.Reader, func() error {
sync.PutZlibReader(zr)
return nil
}), nil

Expand Down Expand Up @@ -373,9 +371,9 @@ func (p *Packfile) fillRegularObjectContent(obj plumbing.EncodedObject) (err err
}

func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plumbing.Hash) error {
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

_, _, err := p.s.NextObject(buf)
if err != nil {
return err
Expand Down Expand Up @@ -417,9 +415,9 @@ func (p *Packfile) fillREFDeltaObjectContentWithBuffer(obj plumbing.EncodedObjec
}

func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset int64) error {
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

_, _, err := p.s.NextObject(buf)
if err != nil {
return err
Expand Down
19 changes: 11 additions & 8 deletions plumbing/format/packfile/parser.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-git/go-git/v5/plumbing/cache"
"github.com/go-git/go-git/v5/plumbing/storer"
"github.com/go-git/go-git/v5/utils/ioutil"
"github.com/go-git/go-git/v5/utils/sync"
)

var (
Expand Down Expand Up @@ -175,7 +176,8 @@ func (p *Parser) init() error {
}

func (p *Parser) indexObjects() error {
buf := new(bytes.Buffer)
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

for i := uint32(0); i < p.count; i++ {
buf.Reset()
Expand Down Expand Up @@ -219,6 +221,7 @@ func (p *Parser) indexObjects() error {
ota = newBaseObject(oh.Offset, oh.Length, t)
}

buf.Grow(int(oh.Length))
_, crc, err := p.scanner.NextObject(buf)
if err != nil {
return err
Expand Down Expand Up @@ -264,7 +267,9 @@ func (p *Parser) indexObjects() error {
}

func (p *Parser) resolveDeltas() error {
buf := &bytes.Buffer{}
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

for _, obj := range p.oi {
buf.Reset()
err := p.get(obj, buf)
Expand Down Expand Up @@ -346,9 +351,8 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
}

if o.DiskType.IsDelta() {
b := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(b)
b.Reset()
b := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(b)
err := p.get(o.Parent, b)
if err != nil {
return err
Expand Down Expand Up @@ -382,9 +386,8 @@ func (p *Parser) resolveObject(
if !o.DiskType.IsDelta() {
return nil
}
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)
err := p.readData(buf, o)
if err != nil {
return err
Expand Down
28 changes: 28 additions & 0 deletions plumbing/format/packfile/parser_test.go
Expand Up @@ -10,8 +10,10 @@ import (
fixtures "github.com/go-git/go-git-fixtures/v4"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/cache"
"github.com/go-git/go-git/v5/plumbing/format/packfile"
"github.com/go-git/go-git/v5/plumbing/storer"
"github.com/go-git/go-git/v5/storage/filesystem"
. "gopkg.in/check.v1"
)

Expand Down Expand Up @@ -248,3 +250,29 @@ func BenchmarkParseBasic(b *testing.B) {
}
}
}

func BenchmarkParser(b *testing.B) {
f := fixtures.Basic().One()
defer fixtures.Clean()

b.ResetTimer()
for n := 0; n < b.N; n++ {
b.StopTimer()
scanner := packfile.NewScanner(f.Packfile())
fs := osfs.New(os.TempDir())
storage := filesystem.NewStorage(fs, cache.NewObjectLRUDefault())

parser, err := packfile.NewParserWithStorage(scanner, storage)
if err != nil {
b.Error(err)
}

b.StartTimer()
_, err = parser.Parse()

b.StopTimer()
if err != nil {
b.Error(err)
}
}
}