Skip to content

Commit

Permalink
Merge pull request #608 from pjbgf/optimise-zlib-reader
Browse files Browse the repository at this point in the history
Optimise zlib reader and consolidate sync.Pools
  • Loading branch information
mcuadros committed Nov 7, 2022
2 parents 652bc83 + a2c309d commit f37bb58
Show file tree
Hide file tree
Showing 20 changed files with 417 additions and 161 deletions.
17 changes: 10 additions & 7 deletions plumbing/format/objfile/reader.go
@@ -1,13 +1,13 @@
package objfile

import (
"compress/zlib"
"errors"
"io"
"strconv"

"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/format/packfile"
"github.com/go-git/go-git/v5/utils/sync"
)

var (
Expand All @@ -20,20 +20,22 @@ var (
// Reader implements io.ReadCloser. Close should be called when finished with
// the Reader. Close will not close the underlying io.Reader.
type Reader struct {
multi io.Reader
zlib io.ReadCloser
hasher plumbing.Hasher
multi io.Reader
zlib io.Reader
zlibref sync.ZLibReader
hasher plumbing.Hasher
}

// NewReader returns a new Reader reading from r.
func NewReader(r io.Reader) (*Reader, error) {
zlib, err := zlib.NewReader(r)
zlib, err := sync.GetZlibReader(r)
if err != nil {
return nil, packfile.ErrZLib.AddDetails(err.Error())
}

return &Reader{
zlib: zlib,
zlib: zlib.Reader,
zlibref: zlib,
}, nil
}

Expand Down Expand Up @@ -110,5 +112,6 @@ func (r *Reader) Hash() plumbing.Hash {
// Close releases any resources consumed by the Reader. Calling Close does not
// close the wrapped io.Reader originally passed to NewReader.
func (r *Reader) Close() error {
return r.zlib.Close()
sync.PutZlibReader(r.zlibref)
return nil
}
16 changes: 4 additions & 12 deletions plumbing/format/objfile/writer.go
Expand Up @@ -5,9 +5,9 @@ import (
"errors"
"io"
"strconv"
"sync"

"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/utils/sync"
)

var (
Expand All @@ -21,7 +21,7 @@ type Writer struct {
raw io.Writer
hasher plumbing.Hasher
multi io.Writer
zlib io.WriteCloser
zlib *zlib.Writer

closed bool
pending int64 // number of unwritten bytes
Expand All @@ -32,21 +32,13 @@ type Writer struct {
// The returned Writer implements io.WriteCloser. Close should be called when
// finished with the Writer. Close will not close the underlying io.Writer.
func NewWriter(w io.Writer) *Writer {
zlib := zlibPool.Get().(*zlib.Writer)
zlib.Reset(w)

zlib := sync.GetZlibWriter(w)
return &Writer{
raw: w,
zlib: zlib,
}
}

var zlibPool = sync.Pool{
New: func() interface{} {
return zlib.NewWriter(nil)
},
}

// WriteHeader writes the type and the size and prepares to accept the object's
// contents. If an invalid t is provided, plumbing.ErrInvalidType is returned. If a
// negative size is provided, ErrNegativeSize is returned.
Expand Down Expand Up @@ -110,7 +102,7 @@ func (w *Writer) Hash() plumbing.Hash {
// Calling Close does not close the wrapped io.Writer originally passed to
// NewWriter.
func (w *Writer) Close() error {
defer zlibPool.Put(w.zlib)
defer sync.PutZlibWriter(w.zlib)
if err := w.zlib.Close(); err != nil {
return err
}
Expand Down
18 changes: 0 additions & 18 deletions plumbing/format/packfile/common.go
@@ -1,10 +1,7 @@
package packfile

import (
"bytes"
"compress/zlib"
"io"
"sync"

"github.com/go-git/go-git/v5/plumbing/storer"
"github.com/go-git/go-git/v5/utils/ioutil"
Expand Down Expand Up @@ -61,18 +58,3 @@ func WritePackfileToObjectStorage(

return err
}

var bufPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}

var zlibInitBytes = []byte{0x78, 0x9c, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01}

var zlibReaderPool = sync.Pool{
New: func() interface{} {
r, _ := zlib.NewReader(bytes.NewReader(zlibInitBytes))
return r
},
}
21 changes: 9 additions & 12 deletions plumbing/format/packfile/diff_delta.go
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/utils/ioutil"
"github.com/go-git/go-git/v5/utils/sync"
)

// See https://github.com/jelmer/dulwich/blob/master/dulwich/pack.py and
Expand Down Expand Up @@ -43,18 +44,16 @@ func getDelta(index *deltaIndex, base, target plumbing.EncodedObject) (o plumbin

defer ioutil.CheckClose(tr, &err)

bb := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(bb)
bb.Reset()
bb := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(bb)

_, err = bb.ReadFrom(br)
if err != nil {
return nil, err
}

tb := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(tb)
tb.Reset()
tb := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(tb)

_, err = tb.ReadFrom(tr)
if err != nil {
Expand All @@ -80,19 +79,17 @@ func DiffDelta(src, tgt []byte) []byte {
}

func diffDelta(index *deltaIndex, src []byte, tgt []byte) []byte {
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)
buf.Write(deltaEncodeSize(len(src)))
buf.Write(deltaEncodeSize(len(tgt)))

if len(index.entries) == 0 {
index.init(src)
}

ibuf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(ibuf)
ibuf.Reset()
ibuf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(ibuf)
for i := 0; i < len(tgt); i++ {
offset, l := index.findMatch(src, tgt, i)

Expand Down
34 changes: 16 additions & 18 deletions plumbing/format/packfile/packfile.go
Expand Up @@ -2,7 +2,6 @@ package packfile

import (
"bytes"
"compress/zlib"
"fmt"
"io"
"os"
Expand All @@ -13,6 +12,7 @@ import (
"github.com/go-git/go-git/v5/plumbing/format/idxfile"
"github.com/go-git/go-git/v5/plumbing/storer"
"github.com/go-git/go-git/v5/utils/ioutil"
"github.com/go-git/go-git/v5/utils/sync"
)

var (
Expand Down Expand Up @@ -138,9 +138,8 @@ func (p *Packfile) getObjectSize(h *ObjectHeader) (int64, error) {
case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject:
return h.Length, nil
case plumbing.REFDeltaObject, plumbing.OFSDeltaObject:
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

if _, _, err := p.s.NextObject(buf); err != nil {
return 0, err
Expand Down Expand Up @@ -227,9 +226,9 @@ func (p *Packfile) getNextObject(h *ObjectHeader, hash plumbing.Hash) (plumbing.
// For delta objects we read the delta data and apply the small object
// optimization only if the expanded version of the object still meets
// the small object threshold condition.
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

if _, _, err := p.s.NextObject(buf); err != nil {
return nil, err
}
Expand Down Expand Up @@ -290,14 +289,13 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) {

func asyncReader(p *Packfile) (io.ReadCloser, error) {
reader := ioutil.NewReaderUsingReaderAt(p.file, p.s.r.offset)
zr := zlibReaderPool.Get().(io.ReadCloser)

if err := zr.(zlib.Resetter).Reset(reader, nil); err != nil {
zr, err := sync.GetZlibReader(reader)
if err != nil {
return nil, fmt.Errorf("zlib reset error: %s", err)
}

return ioutil.NewReadCloserWithCloser(zr, func() error {
zlibReaderPool.Put(zr)
return ioutil.NewReadCloserWithCloser(zr.Reader, func() error {
sync.PutZlibReader(zr)
return nil
}), nil

Expand Down Expand Up @@ -373,9 +371,9 @@ func (p *Packfile) fillRegularObjectContent(obj plumbing.EncodedObject) (err err
}

func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plumbing.Hash) error {
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

_, _, err := p.s.NextObject(buf)
if err != nil {
return err
Expand Down Expand Up @@ -417,9 +415,9 @@ func (p *Packfile) fillREFDeltaObjectContentWithBuffer(obj plumbing.EncodedObjec
}

func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset int64) error {
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

_, _, err := p.s.NextObject(buf)
if err != nil {
return err
Expand Down
19 changes: 11 additions & 8 deletions plumbing/format/packfile/parser.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-git/go-git/v5/plumbing/cache"
"github.com/go-git/go-git/v5/plumbing/storer"
"github.com/go-git/go-git/v5/utils/ioutil"
"github.com/go-git/go-git/v5/utils/sync"
)

var (
Expand Down Expand Up @@ -175,7 +176,8 @@ func (p *Parser) init() error {
}

func (p *Parser) indexObjects() error {
buf := new(bytes.Buffer)
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

for i := uint32(0); i < p.count; i++ {
buf.Reset()
Expand Down Expand Up @@ -219,6 +221,7 @@ func (p *Parser) indexObjects() error {
ota = newBaseObject(oh.Offset, oh.Length, t)
}

buf.Grow(int(oh.Length))
_, crc, err := p.scanner.NextObject(buf)
if err != nil {
return err
Expand Down Expand Up @@ -264,7 +267,9 @@ func (p *Parser) indexObjects() error {
}

func (p *Parser) resolveDeltas() error {
buf := &bytes.Buffer{}
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)

for _, obj := range p.oi {
buf.Reset()
err := p.get(obj, buf)
Expand Down Expand Up @@ -346,9 +351,8 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) {
}

if o.DiskType.IsDelta() {
b := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(b)
b.Reset()
b := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(b)
err := p.get(o.Parent, b)
if err != nil {
return err
Expand Down Expand Up @@ -382,9 +386,8 @@ func (p *Parser) resolveObject(
if !o.DiskType.IsDelta() {
return nil
}
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)
err := p.readData(buf, o)
if err != nil {
return err
Expand Down
18 changes: 8 additions & 10 deletions plumbing/format/packfile/patch_delta.go
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/utils/ioutil"
"github.com/go-git/go-git/v5/utils/sync"
)

// See https://github.com/git/git/blob/49fa3dc76179e04b0833542fa52d0f287a4955ac/delta.h
Expand All @@ -34,29 +35,26 @@ func ApplyDelta(target, base plumbing.EncodedObject, delta []byte) (err error) {

defer ioutil.CheckClose(w, &err)

buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(buf)
_, err = buf.ReadFrom(r)
if err != nil {
return err
}
src := buf.Bytes()

dst := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(dst)
dst.Reset()
dst := sync.GetBytesBuffer()
defer sync.PutBytesBuffer(dst)
err = patchDelta(dst, src, delta)
if err != nil {
return err
}

target.SetSize(int64(dst.Len()))

bufp := byteSlicePool.Get().(*[]byte)
b := *bufp
_, err = io.CopyBuffer(w, dst, b)
byteSlicePool.Put(bufp)
b := sync.GetByteSlice()
_, err = io.CopyBuffer(w, dst, *b)
sync.PutByteSlice(b)
return err
}

Expand Down

0 comments on commit f37bb58

Please sign in to comment.