From 123cdde6f2f6282cb779e03745d384833ac1265b Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Wed, 26 Oct 2022 18:12:39 +0100 Subject: [PATCH 1/4] Use Sync.Pool pointers to optimise memory usage Signed-off-by: Paulo Gomes --- plumbing/format/objfile/writer.go | 15 +++++++++++-- plumbing/format/packfile/parser_test.go | 28 +++++++++++++++++++++++++ plumbing/format/packfile/patch_delta.go | 5 +++-- plumbing/format/packfile/scanner.go | 14 ++++++++----- storage/filesystem/object.go | 14 ++++++++++++- worktree.go | 8 ++++--- 6 files changed, 71 insertions(+), 13 deletions(-) diff --git a/plumbing/format/objfile/writer.go b/plumbing/format/objfile/writer.go index 2a96a4370..248f81bef 100644 --- a/plumbing/format/objfile/writer.go +++ b/plumbing/format/objfile/writer.go @@ -5,6 +5,7 @@ import ( "errors" "io" "strconv" + "sync" "github.com/go-git/go-git/v5/plumbing" ) @@ -18,9 +19,9 @@ var ( // not close the underlying io.Writer. type Writer struct { raw io.Writer - zlib io.WriteCloser hasher plumbing.Hasher multi io.Writer + zlib io.WriteCloser closed bool pending int64 // number of unwritten bytes @@ -31,12 +32,21 @@ type Writer struct { // The returned Writer implements io.WriteCloser. Close should be called when // finished with the Writer. Close will not close the underlying io.Writer. func NewWriter(w io.Writer) *Writer { + zlib := zlibPool.Get().(*zlib.Writer) + zlib.Reset(w) + return &Writer{ raw: w, - zlib: zlib.NewWriter(w), + zlib: zlib, } } +var zlibPool = sync.Pool{ + New: func() interface{} { + return zlib.NewWriter(nil) + }, +} + // WriteHeader writes the type and the size and prepares to accept the object's // contents. If an invalid t is provided, plumbing.ErrInvalidType is returned. If a // negative size is provided, ErrNegativeSize is returned. @@ -100,6 +110,7 @@ func (w *Writer) Hash() plumbing.Hash { // Calling Close does not close the wrapped io.Writer originally passed to // NewWriter. func (w *Writer) Close() error { + defer zlibPool.Put(w.zlib) if err := w.zlib.Close(); err != nil { return err } diff --git a/plumbing/format/packfile/parser_test.go b/plumbing/format/packfile/parser_test.go index 09f3f9704..651d05f3d 100644 --- a/plumbing/format/packfile/parser_test.go +++ b/plumbing/format/packfile/parser_test.go @@ -10,8 +10,10 @@ import ( fixtures "github.com/go-git/go-git-fixtures/v4" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing" + "github.com/go-git/go-git/v5/plumbing/cache" "github.com/go-git/go-git/v5/plumbing/format/packfile" "github.com/go-git/go-git/v5/plumbing/storer" + "github.com/go-git/go-git/v5/storage/filesystem" . "gopkg.in/check.v1" ) @@ -248,3 +250,29 @@ func BenchmarkParseBasic(b *testing.B) { } } } + +func BenchmarkParser(b *testing.B) { + f := fixtures.Basic().One() + defer fixtures.Clean() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + b.StopTimer() + scanner := packfile.NewScanner(f.Packfile()) + fs := osfs.New(os.TempDir()) + storage := filesystem.NewStorage(fs, cache.NewObjectLRUDefault()) + + parser, err := packfile.NewParserWithStorage(scanner, storage) + if err != nil { + b.Error(err) + } + + b.StartTimer() + _, err = parser.Parse() + + b.StopTimer() + if err != nil { + b.Error(err) + } + } +} diff --git a/plumbing/format/packfile/patch_delta.go b/plumbing/format/packfile/patch_delta.go index 17da11e03..053466ddf 100644 --- a/plumbing/format/packfile/patch_delta.go +++ b/plumbing/format/packfile/patch_delta.go @@ -53,9 +53,10 @@ func ApplyDelta(target, base plumbing.EncodedObject, delta []byte) (err error) { target.SetSize(int64(dst.Len())) - b := byteSlicePool.Get().([]byte) + bufp := byteSlicePool.Get().(*[]byte) + b := *bufp _, err = io.CopyBuffer(w, dst, b) - byteSlicePool.Put(b) + byteSlicePool.Put(bufp) return err } diff --git a/plumbing/format/packfile/scanner.go b/plumbing/format/packfile/scanner.go index 45d480c04..b655594b7 100644 --- a/plumbing/format/packfile/scanner.go +++ b/plumbing/format/packfile/scanner.go @@ -346,15 +346,17 @@ func (s *Scanner) copyObject(w io.Writer) (n int64, err error) { } defer ioutil.CheckClose(zr, &err) - buf := byteSlicePool.Get().([]byte) + bufp := byteSlicePool.Get().(*[]byte) + buf := *bufp n, err = io.CopyBuffer(w, zr, buf) - byteSlicePool.Put(buf) + byteSlicePool.Put(bufp) return } var byteSlicePool = sync.Pool{ New: func() interface{} { - return make([]byte, 32*1024) + b := make([]byte, 32*1024) + return &b }, } @@ -387,9 +389,11 @@ func (s *Scanner) Checksum() (plumbing.Hash, error) { // Close reads the reader until io.EOF func (s *Scanner) Close() error { - buf := byteSlicePool.Get().([]byte) + bufp := byteSlicePool.Get().(*[]byte) + buf := *bufp _, err := io.CopyBuffer(stdioutil.Discard, s.r, buf) - byteSlicePool.Put(buf) + byteSlicePool.Put(bufp) + return err } diff --git a/storage/filesystem/object.go b/storage/filesystem/object.go index 5c91bcd69..21667fa5a 100644 --- a/storage/filesystem/object.go +++ b/storage/filesystem/object.go @@ -4,6 +4,7 @@ import ( "bytes" "io" "os" + "sync" "time" "github.com/go-git/go-git/v5/plumbing" @@ -419,10 +420,21 @@ func (s *ObjectStorage) getFromUnpacked(h plumbing.Hash) (obj plumbing.EncodedOb s.objectCache.Put(obj) - _, err = io.Copy(w, r) + bufp := copyBufferPool.Get().(*[]byte) + buf := *bufp + _, err = io.CopyBuffer(w, r, buf) + copyBufferPool.Put(bufp) + return obj, err } +var copyBufferPool = sync.Pool{ + New: func() interface{} { + b := make([]byte, 32*1024) + return &b + }, +} + // Get returns the object with the given hash, by searching for it in // the packfile. func (s *ObjectStorage) getFromPackfile(h plumbing.Hash, canBeDelta bool) ( diff --git a/worktree.go b/worktree.go index c974aed24..98116ca52 100644 --- a/worktree.go +++ b/worktree.go @@ -534,7 +534,8 @@ func (w *Worktree) checkoutChangeRegularFile(name string, var copyBufferPool = sync.Pool{ New: func() interface{} { - return make([]byte, 32*1024) + b := make([]byte, 32*1024) + return &b }, } @@ -561,9 +562,10 @@ func (w *Worktree) checkoutFile(f *object.File) (err error) { } defer ioutil.CheckClose(to, &err) - buf := copyBufferPool.Get().([]byte) + bufp := copyBufferPool.Get().(*[]byte) + buf := *bufp _, err = io.CopyBuffer(to, from, buf) - copyBufferPool.Put(buf) + copyBufferPool.Put(bufp) return } From ffa7e69efb8c4ba8d4e08ec4c65e49e2228fd88b Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Mon, 7 Nov 2022 14:42:00 +0000 Subject: [PATCH 2/4] Optimise Reference.String() Decreases allocations and bytes per operation by using string builder with a predefined size. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit One additional allocation has been removed by using its own implementation of Strings(). The reason behind this was due to the fact the calls to .String() are more recurrent than .Strings() and the performance impact was worth the code duplication. Benchmark results: cpu: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz name old time/op new time/op delta ReferenceStringSymbolic-16 140ns ± 4% 40ns ± 9% -71.19% (p=0.008 n=5+5) ReferenceStringHash-16 174ns ±14% 85ns ± 4% -51.13% (p=0.008 n=5+5) ReferenceStringInvalid-16 48.9ns ± 2% 1.5ns ± 3% -96.96% (p=0.008 n=5+5) name old alloc/op new alloc/op delta ReferenceStringSymbolic-16 88.0B ± 0% 32.0B ± 0% -63.64% (p=0.008 n=5+5) ReferenceStringHash-16 176B ± 0% 144B ± 0% -18.18% (p=0.008 n=5+5) ReferenceStringInvalid-16 0.00B 0.00B ~ (all equal) name old allocs/op new allocs/op delta ReferenceStringSymbolic-16 4.00 ± 0% 1.00 ± 0% -75.00% (p=0.008 n=5+5) ReferenceStringHash-16 5.00 ± 0% 3.00 ± 0% -40.00% (p=0.008 n=5+5) ReferenceStringInvalid-16 0.00 0.00 ~ (all equal) Signed-off-by: Paulo Gomes --- plumbing/reference.go | 19 +++++++++++++++++-- plumbing/reference_test.go | 24 +++++++++++++++++++++++- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/plumbing/reference.go b/plumbing/reference.go index deb50676a..eef11e827 100644 --- a/plumbing/reference.go +++ b/plumbing/reference.go @@ -204,6 +204,21 @@ func (r *Reference) Strings() [2]string { } func (r *Reference) String() string { - s := r.Strings() - return fmt.Sprintf("%s %s", s[1], s[0]) + ref := "" + switch r.Type() { + case HashReference: + ref = r.Hash().String() + case SymbolicReference: + ref = symrefPrefix + r.Target().String() + default: + return "" + } + + name := r.Name().String() + var v strings.Builder + v.Grow(len(ref) + len(name) + 1) + v.WriteString(ref) + v.WriteString(" ") + v.WriteString(name) + return v.String() } diff --git a/plumbing/reference_test.go b/plumbing/reference_test.go index b3ccf5340..e69076ff6 100644 --- a/plumbing/reference_test.go +++ b/plumbing/reference_test.go @@ -1,6 +1,10 @@ package plumbing -import . "gopkg.in/check.v1" +import ( + "testing" + + . "gopkg.in/check.v1" +) type ReferenceSuite struct{} @@ -98,3 +102,21 @@ func (s *ReferenceSuite) TestIsTag(c *C) { r := ReferenceName("refs/tags/v3.1.") c.Assert(r.IsTag(), Equals, true) } + +func benchMarkReferenceString(r *Reference, b *testing.B) { + for n := 0; n < b.N; n++ { + r.String() + } +} + +func BenchmarkReferenceStringSymbolic(b *testing.B) { + benchMarkReferenceString(NewSymbolicReference("v3.1.1", "refs/tags/v3.1.1"), b) +} + +func BenchmarkReferenceStringHash(b *testing.B) { + benchMarkReferenceString(NewHashReference("v3.1.1", NewHash("6ecf0ef2c2dffb796033e5a02219af86ec6584e5")), b) +} + +func BenchmarkReferenceStringInvalid(b *testing.B) { + benchMarkReferenceString(&Reference{}, b) +} From 9490da0f86a12269abb2099e2ead1f20eec166d2 Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Fri, 4 Nov 2022 12:44:40 +0000 Subject: [PATCH 3/4] Optimize zlib reader and consolidate sync.pools Expands on the optimisations from https://github.com/fluxcd/go-git/pull/5 and ensures that zlib reader does not need to recreate a deflate dictionary at every use. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The use of sync pools was consolidated into a new sync utils package. name old time/op new time/op delta Parser-16 7.51ms ± 3% 7.71ms ± 6% ~ (p=0.222 n=5+5) name old alloc/op new alloc/op delta Parser-16 4.65MB ± 3% 1.90MB ± 3% -59.06% (p=0.008 n=5+5) name old allocs/op new allocs/op delta Parser-16 3.48k ± 0% 3.32k ± 0% -4.57% (p=0.016 n=5+4) Signed-off-by: Paulo Gomes --- plumbing/format/objfile/reader.go | 17 +++--- plumbing/format/objfile/writer.go | 16 ++---- plumbing/format/packfile/common.go | 18 ------ plumbing/format/packfile/diff_delta.go | 21 +++---- plumbing/format/packfile/packfile.go | 34 ++++++------ plumbing/format/packfile/parser.go | 19 ++++--- plumbing/format/packfile/patch_delta.go | 18 +++--- plumbing/format/packfile/scanner.go | 54 ++++++++---------- plumbing/object/commit.go | 7 +-- plumbing/object/common.go | 12 ---- plumbing/object/tag.go | 8 +-- plumbing/object/tree.go | 8 +-- utils/sync/bufio.go | 29 ++++++++++ utils/sync/bufio_test.go | 26 +++++++++ utils/sync/bytes.go | 51 +++++++++++++++++ utils/sync/bytes_test.go | 49 ++++++++++++++++ utils/sync/zlib.go | 74 +++++++++++++++++++++++++ utils/sync/zlib_test.go | 74 +++++++++++++++++++++++++ worktree.go | 16 ++---- 19 files changed, 398 insertions(+), 153 deletions(-) delete mode 100644 plumbing/object/common.go create mode 100644 utils/sync/bufio.go create mode 100644 utils/sync/bufio_test.go create mode 100644 utils/sync/bytes.go create mode 100644 utils/sync/bytes_test.go create mode 100644 utils/sync/zlib.go create mode 100644 utils/sync/zlib_test.go diff --git a/plumbing/format/objfile/reader.go b/plumbing/format/objfile/reader.go index b6b2ca06d..d7932f4ea 100644 --- a/plumbing/format/objfile/reader.go +++ b/plumbing/format/objfile/reader.go @@ -1,13 +1,13 @@ package objfile import ( - "compress/zlib" "errors" "io" "strconv" "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/plumbing/format/packfile" + "github.com/go-git/go-git/v5/utils/sync" ) var ( @@ -20,20 +20,22 @@ var ( // Reader implements io.ReadCloser. Close should be called when finished with // the Reader. Close will not close the underlying io.Reader. type Reader struct { - multi io.Reader - zlib io.ReadCloser - hasher plumbing.Hasher + multi io.Reader + zlib io.Reader + zlibref sync.ZLibReader + hasher plumbing.Hasher } // NewReader returns a new Reader reading from r. func NewReader(r io.Reader) (*Reader, error) { - zlib, err := zlib.NewReader(r) + zlib, err := sync.GetZlibReader(r) if err != nil { return nil, packfile.ErrZLib.AddDetails(err.Error()) } return &Reader{ - zlib: zlib, + zlib: zlib.Reader, + zlibref: zlib, }, nil } @@ -110,5 +112,6 @@ func (r *Reader) Hash() plumbing.Hash { // Close releases any resources consumed by the Reader. Calling Close does not // close the wrapped io.Reader originally passed to NewReader. func (r *Reader) Close() error { - return r.zlib.Close() + sync.PutZlibReader(r.zlibref) + return nil } diff --git a/plumbing/format/objfile/writer.go b/plumbing/format/objfile/writer.go index 248f81bef..0d0f15492 100644 --- a/plumbing/format/objfile/writer.go +++ b/plumbing/format/objfile/writer.go @@ -5,9 +5,9 @@ import ( "errors" "io" "strconv" - "sync" "github.com/go-git/go-git/v5/plumbing" + "github.com/go-git/go-git/v5/utils/sync" ) var ( @@ -21,7 +21,7 @@ type Writer struct { raw io.Writer hasher plumbing.Hasher multi io.Writer - zlib io.WriteCloser + zlib *zlib.Writer closed bool pending int64 // number of unwritten bytes @@ -32,21 +32,13 @@ type Writer struct { // The returned Writer implements io.WriteCloser. Close should be called when // finished with the Writer. Close will not close the underlying io.Writer. func NewWriter(w io.Writer) *Writer { - zlib := zlibPool.Get().(*zlib.Writer) - zlib.Reset(w) - + zlib := sync.GetZlibWriter(w) return &Writer{ raw: w, zlib: zlib, } } -var zlibPool = sync.Pool{ - New: func() interface{} { - return zlib.NewWriter(nil) - }, -} - // WriteHeader writes the type and the size and prepares to accept the object's // contents. If an invalid t is provided, plumbing.ErrInvalidType is returned. If a // negative size is provided, ErrNegativeSize is returned. @@ -110,7 +102,7 @@ func (w *Writer) Hash() plumbing.Hash { // Calling Close does not close the wrapped io.Writer originally passed to // NewWriter. func (w *Writer) Close() error { - defer zlibPool.Put(w.zlib) + defer sync.PutZlibWriter(w.zlib) if err := w.zlib.Close(); err != nil { return err } diff --git a/plumbing/format/packfile/common.go b/plumbing/format/packfile/common.go index df423ad50..36c5ef5b8 100644 --- a/plumbing/format/packfile/common.go +++ b/plumbing/format/packfile/common.go @@ -1,10 +1,7 @@ package packfile import ( - "bytes" - "compress/zlib" "io" - "sync" "github.com/go-git/go-git/v5/plumbing/storer" "github.com/go-git/go-git/v5/utils/ioutil" @@ -61,18 +58,3 @@ func WritePackfileToObjectStorage( return err } - -var bufPool = sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(nil) - }, -} - -var zlibInitBytes = []byte{0x78, 0x9c, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01} - -var zlibReaderPool = sync.Pool{ - New: func() interface{} { - r, _ := zlib.NewReader(bytes.NewReader(zlibInitBytes)) - return r - }, -} diff --git a/plumbing/format/packfile/diff_delta.go b/plumbing/format/packfile/diff_delta.go index 1951b34ef..2c7a33581 100644 --- a/plumbing/format/packfile/diff_delta.go +++ b/plumbing/format/packfile/diff_delta.go @@ -5,6 +5,7 @@ import ( "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/utils/ioutil" + "github.com/go-git/go-git/v5/utils/sync" ) // See https://github.com/jelmer/dulwich/blob/master/dulwich/pack.py and @@ -43,18 +44,16 @@ func getDelta(index *deltaIndex, base, target plumbing.EncodedObject) (o plumbin defer ioutil.CheckClose(tr, &err) - bb := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(bb) - bb.Reset() + bb := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(bb) _, err = bb.ReadFrom(br) if err != nil { return nil, err } - tb := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(tb) - tb.Reset() + tb := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(tb) _, err = tb.ReadFrom(tr) if err != nil { @@ -80,9 +79,8 @@ func DiffDelta(src, tgt []byte) []byte { } func diffDelta(index *deltaIndex, src []byte, tgt []byte) []byte { - buf := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(buf) - buf.Reset() + buf := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(buf) buf.Write(deltaEncodeSize(len(src))) buf.Write(deltaEncodeSize(len(tgt))) @@ -90,9 +88,8 @@ func diffDelta(index *deltaIndex, src []byte, tgt []byte) []byte { index.init(src) } - ibuf := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(ibuf) - ibuf.Reset() + ibuf := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(ibuf) for i := 0; i < len(tgt); i++ { offset, l := index.findMatch(src, tgt, i) diff --git a/plumbing/format/packfile/packfile.go b/plumbing/format/packfile/packfile.go index 8dd6041d5..685270225 100644 --- a/plumbing/format/packfile/packfile.go +++ b/plumbing/format/packfile/packfile.go @@ -2,7 +2,6 @@ package packfile import ( "bytes" - "compress/zlib" "fmt" "io" "os" @@ -13,6 +12,7 @@ import ( "github.com/go-git/go-git/v5/plumbing/format/idxfile" "github.com/go-git/go-git/v5/plumbing/storer" "github.com/go-git/go-git/v5/utils/ioutil" + "github.com/go-git/go-git/v5/utils/sync" ) var ( @@ -138,9 +138,8 @@ func (p *Packfile) getObjectSize(h *ObjectHeader) (int64, error) { case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject: return h.Length, nil case plumbing.REFDeltaObject, plumbing.OFSDeltaObject: - buf := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(buf) - buf.Reset() + buf := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(buf) if _, _, err := p.s.NextObject(buf); err != nil { return 0, err @@ -227,9 +226,9 @@ func (p *Packfile) getNextObject(h *ObjectHeader, hash plumbing.Hash) (plumbing. // For delta objects we read the delta data and apply the small object // optimization only if the expanded version of the object still meets // the small object threshold condition. - buf := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(buf) - buf.Reset() + buf := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(buf) + if _, _, err := p.s.NextObject(buf); err != nil { return nil, err } @@ -290,14 +289,13 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) { func asyncReader(p *Packfile) (io.ReadCloser, error) { reader := ioutil.NewReaderUsingReaderAt(p.file, p.s.r.offset) - zr := zlibReaderPool.Get().(io.ReadCloser) - - if err := zr.(zlib.Resetter).Reset(reader, nil); err != nil { + zr, err := sync.GetZlibReader(reader) + if err != nil { return nil, fmt.Errorf("zlib reset error: %s", err) } - return ioutil.NewReadCloserWithCloser(zr, func() error { - zlibReaderPool.Put(zr) + return ioutil.NewReadCloserWithCloser(zr.Reader, func() error { + sync.PutZlibReader(zr) return nil }), nil @@ -373,9 +371,9 @@ func (p *Packfile) fillRegularObjectContent(obj plumbing.EncodedObject) (err err } func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plumbing.Hash) error { - buf := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(buf) - buf.Reset() + buf := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(buf) + _, _, err := p.s.NextObject(buf) if err != nil { return err @@ -417,9 +415,9 @@ func (p *Packfile) fillREFDeltaObjectContentWithBuffer(obj plumbing.EncodedObjec } func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset int64) error { - buf := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(buf) - buf.Reset() + buf := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(buf) + _, _, err := p.s.NextObject(buf) if err != nil { return err diff --git a/plumbing/format/packfile/parser.go b/plumbing/format/packfile/parser.go index 9ec838e45..522c146f2 100644 --- a/plumbing/format/packfile/parser.go +++ b/plumbing/format/packfile/parser.go @@ -10,6 +10,7 @@ import ( "github.com/go-git/go-git/v5/plumbing/cache" "github.com/go-git/go-git/v5/plumbing/storer" "github.com/go-git/go-git/v5/utils/ioutil" + "github.com/go-git/go-git/v5/utils/sync" ) var ( @@ -175,7 +176,8 @@ func (p *Parser) init() error { } func (p *Parser) indexObjects() error { - buf := new(bytes.Buffer) + buf := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(buf) for i := uint32(0); i < p.count; i++ { buf.Reset() @@ -219,6 +221,7 @@ func (p *Parser) indexObjects() error { ota = newBaseObject(oh.Offset, oh.Length, t) } + buf.Grow(int(oh.Length)) _, crc, err := p.scanner.NextObject(buf) if err != nil { return err @@ -264,7 +267,9 @@ func (p *Parser) indexObjects() error { } func (p *Parser) resolveDeltas() error { - buf := &bytes.Buffer{} + buf := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(buf) + for _, obj := range p.oi { buf.Reset() err := p.get(obj, buf) @@ -346,9 +351,8 @@ func (p *Parser) get(o *objectInfo, buf *bytes.Buffer) (err error) { } if o.DiskType.IsDelta() { - b := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(b) - b.Reset() + b := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(b) err := p.get(o.Parent, b) if err != nil { return err @@ -382,9 +386,8 @@ func (p *Parser) resolveObject( if !o.DiskType.IsDelta() { return nil } - buf := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(buf) - buf.Reset() + buf := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(buf) err := p.readData(buf, o) if err != nil { return err diff --git a/plumbing/format/packfile/patch_delta.go b/plumbing/format/packfile/patch_delta.go index 053466ddf..f00562d63 100644 --- a/plumbing/format/packfile/patch_delta.go +++ b/plumbing/format/packfile/patch_delta.go @@ -9,6 +9,7 @@ import ( "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/utils/ioutil" + "github.com/go-git/go-git/v5/utils/sync" ) // See https://github.com/git/git/blob/49fa3dc76179e04b0833542fa52d0f287a4955ac/delta.h @@ -34,18 +35,16 @@ func ApplyDelta(target, base plumbing.EncodedObject, delta []byte) (err error) { defer ioutil.CheckClose(w, &err) - buf := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(buf) - buf.Reset() + buf := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(buf) _, err = buf.ReadFrom(r) if err != nil { return err } src := buf.Bytes() - dst := bufPool.Get().(*bytes.Buffer) - defer bufPool.Put(dst) - dst.Reset() + dst := sync.GetBytesBuffer() + defer sync.PutBytesBuffer(dst) err = patchDelta(dst, src, delta) if err != nil { return err @@ -53,10 +52,9 @@ func ApplyDelta(target, base plumbing.EncodedObject, delta []byte) (err error) { target.SetSize(int64(dst.Len())) - bufp := byteSlicePool.Get().(*[]byte) - b := *bufp - _, err = io.CopyBuffer(w, dst, b) - byteSlicePool.Put(bufp) + b := sync.GetByteSlice() + _, err = io.CopyBuffer(w, dst, *b) + sync.PutByteSlice(b) return err } diff --git a/plumbing/format/packfile/scanner.go b/plumbing/format/packfile/scanner.go index b655594b7..9ebb84a24 100644 --- a/plumbing/format/packfile/scanner.go +++ b/plumbing/format/packfile/scanner.go @@ -3,17 +3,16 @@ package packfile import ( "bufio" "bytes" - "compress/zlib" "fmt" "hash" "hash/crc32" "io" stdioutil "io/ioutil" - "sync" "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/utils/binary" "github.com/go-git/go-git/v5/utils/ioutil" + "github.com/go-git/go-git/v5/utils/sync" ) var ( @@ -323,14 +322,14 @@ func (s *Scanner) NextObject(w io.Writer) (written int64, crc32 uint32, err erro // ReadObject returns a reader for the object content and an error func (s *Scanner) ReadObject() (io.ReadCloser, error) { s.pendingObject = nil - zr := zlibReaderPool.Get().(io.ReadCloser) + zr, err := sync.GetZlibReader(s.r) - if err := zr.(zlib.Resetter).Reset(s.r, nil); err != nil { + if err != nil { return nil, fmt.Errorf("zlib reset error: %s", err) } - return ioutil.NewReadCloserWithCloser(zr, func() error { - zlibReaderPool.Put(zr) + return ioutil.NewReadCloserWithCloser(zr.Reader, func() error { + sync.PutZlibReader(zr) return nil }), nil } @@ -338,28 +337,20 @@ func (s *Scanner) ReadObject() (io.ReadCloser, error) { // ReadRegularObject reads and write a non-deltified object // from it zlib stream in an object entry in the packfile. func (s *Scanner) copyObject(w io.Writer) (n int64, err error) { - zr := zlibReaderPool.Get().(io.ReadCloser) - defer zlibReaderPool.Put(zr) + zr, err := sync.GetZlibReader(s.r) + defer sync.PutZlibReader(zr) - if err = zr.(zlib.Resetter).Reset(s.r, nil); err != nil { + if err != nil { return 0, fmt.Errorf("zlib reset error: %s", err) } - defer ioutil.CheckClose(zr, &err) - bufp := byteSlicePool.Get().(*[]byte) - buf := *bufp - n, err = io.CopyBuffer(w, zr, buf) - byteSlicePool.Put(bufp) + defer ioutil.CheckClose(zr.Reader, &err) + buf := sync.GetByteSlice() + n, err = io.CopyBuffer(w, zr.Reader, *buf) + sync.PutByteSlice(buf) return } -var byteSlicePool = sync.Pool{ - New: func() interface{} { - b := make([]byte, 32*1024) - return &b - }, -} - // SeekFromStart sets a new offset from start, returns the old position before // the change. func (s *Scanner) SeekFromStart(offset int64) (previous int64, err error) { @@ -389,10 +380,9 @@ func (s *Scanner) Checksum() (plumbing.Hash, error) { // Close reads the reader until io.EOF func (s *Scanner) Close() error { - bufp := byteSlicePool.Get().(*[]byte) - buf := *bufp - _, err := io.CopyBuffer(stdioutil.Discard, s.r, buf) - byteSlicePool.Put(bufp) + buf := sync.GetByteSlice() + _, err := io.CopyBuffer(stdioutil.Discard, s.r, *buf) + sync.PutByteSlice(buf) return err } @@ -403,13 +393,13 @@ func (s *Scanner) Flush() error { } // scannerReader has the following characteristics: -// - Provides an io.SeekReader impl for bufio.Reader, when the underlying -// reader supports it. -// - Keeps track of the current read position, for when the underlying reader -// isn't an io.SeekReader, but we still want to know the current offset. -// - Writes to the hash writer what it reads, with the aid of a smaller buffer. -// The buffer helps avoid a performance penalty for performing small writes -// to the crc32 hash writer. +// - Provides an io.SeekReader impl for bufio.Reader, when the underlying +// reader supports it. +// - Keeps track of the current read position, for when the underlying reader +// isn't an io.SeekReader, but we still want to know the current offset. +// - Writes to the hash writer what it reads, with the aid of a smaller buffer. +// The buffer helps avoid a performance penalty for performing small writes +// to the crc32 hash writer. type scannerReader struct { reader io.Reader crc io.Writer diff --git a/plumbing/object/commit.go b/plumbing/object/commit.go index 7a1b8e5ae..d2f718408 100644 --- a/plumbing/object/commit.go +++ b/plumbing/object/commit.go @@ -1,7 +1,6 @@ package object import ( - "bufio" "bytes" "context" "errors" @@ -14,6 +13,7 @@ import ( "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/plumbing/storer" "github.com/go-git/go-git/v5/utils/ioutil" + "github.com/go-git/go-git/v5/utils/sync" ) const ( @@ -180,9 +180,8 @@ func (c *Commit) Decode(o plumbing.EncodedObject) (err error) { } defer ioutil.CheckClose(reader, &err) - r := bufPool.Get().(*bufio.Reader) - defer bufPool.Put(r) - r.Reset(reader) + r := sync.GetBufioReader(reader) + defer sync.PutBufioReader(r) var message bool var pgpsig bool diff --git a/plumbing/object/common.go b/plumbing/object/common.go deleted file mode 100644 index 3591f5f0a..000000000 --- a/plumbing/object/common.go +++ /dev/null @@ -1,12 +0,0 @@ -package object - -import ( - "bufio" - "sync" -) - -var bufPool = sync.Pool{ - New: func() interface{} { - return bufio.NewReader(nil) - }, -} diff --git a/plumbing/object/tag.go b/plumbing/object/tag.go index 216010d91..84066f768 100644 --- a/plumbing/object/tag.go +++ b/plumbing/object/tag.go @@ -1,7 +1,6 @@ package object import ( - "bufio" "bytes" "fmt" "io" @@ -13,6 +12,7 @@ import ( "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/plumbing/storer" "github.com/go-git/go-git/v5/utils/ioutil" + "github.com/go-git/go-git/v5/utils/sync" ) // Tag represents an annotated tag object. It points to a single git object of @@ -93,9 +93,9 @@ func (t *Tag) Decode(o plumbing.EncodedObject) (err error) { } defer ioutil.CheckClose(reader, &err) - r := bufPool.Get().(*bufio.Reader) - defer bufPool.Put(r) - r.Reset(reader) + r := sync.GetBufioReader(reader) + defer sync.PutBufioReader(r) + for { var line []byte line, err = r.ReadBytes('\n') diff --git a/plumbing/object/tree.go b/plumbing/object/tree.go index 5e6378ca4..e9f7666b8 100644 --- a/plumbing/object/tree.go +++ b/plumbing/object/tree.go @@ -1,7 +1,6 @@ package object import ( - "bufio" "context" "errors" "fmt" @@ -14,6 +13,7 @@ import ( "github.com/go-git/go-git/v5/plumbing/filemode" "github.com/go-git/go-git/v5/plumbing/storer" "github.com/go-git/go-git/v5/utils/ioutil" + "github.com/go-git/go-git/v5/utils/sync" ) const ( @@ -230,9 +230,9 @@ func (t *Tree) Decode(o plumbing.EncodedObject) (err error) { } defer ioutil.CheckClose(reader, &err) - r := bufPool.Get().(*bufio.Reader) - defer bufPool.Put(r) - r.Reset(reader) + r := sync.GetBufioReader(reader) + defer sync.PutBufioReader(r) + for { str, err := r.ReadString(' ') if err != nil { diff --git a/utils/sync/bufio.go b/utils/sync/bufio.go new file mode 100644 index 000000000..5009ea804 --- /dev/null +++ b/utils/sync/bufio.go @@ -0,0 +1,29 @@ +package sync + +import ( + "bufio" + "io" + "sync" +) + +var bufioReader = sync.Pool{ + New: func() interface{} { + return bufio.NewReader(nil) + }, +} + +// GetBufioReader returns a *bufio.Reader that is managed by a sync.Pool. +// Returns a bufio.Reader that is resetted with reader and ready for use. +// +// After use, the *bufio.Reader should be put back into the sync.Pool +// by calling PutBufioReader. +func GetBufioReader(reader io.Reader) *bufio.Reader { + r := bufioReader.Get().(*bufio.Reader) + r.Reset(reader) + return r +} + +// PutBufioReader puts reader back into its sync.Pool. +func PutBufioReader(reader *bufio.Reader) { + bufioReader.Put(reader) +} diff --git a/utils/sync/bufio_test.go b/utils/sync/bufio_test.go new file mode 100644 index 000000000..e70f3d803 --- /dev/null +++ b/utils/sync/bufio_test.go @@ -0,0 +1,26 @@ +package sync + +import ( + "io" + "strings" + "testing" +) + +func TestGetAndPutBufioReader(t *testing.T) { + wanted := "someinput" + r := GetBufioReader(strings.NewReader(wanted)) + if r == nil { + t.Error("nil was not expected") + } + + got, err := r.ReadString(0) + if err != nil && err != io.EOF { + t.Errorf("unexpected error reading string: %v", err) + } + + if wanted != got { + t.Errorf("wanted %q got %q", wanted, got) + } + + PutBufioReader(r) +} diff --git a/utils/sync/bytes.go b/utils/sync/bytes.go new file mode 100644 index 000000000..dd06fc0bc --- /dev/null +++ b/utils/sync/bytes.go @@ -0,0 +1,51 @@ +package sync + +import ( + "bytes" + "sync" +) + +var ( + byteSlice = sync.Pool{ + New: func() interface{} { + b := make([]byte, 16*1024) + return &b + }, + } + bytesBuffer = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(nil) + }, + } +) + +// GetByteSlice returns a *[]byte that is managed by a sync.Pool. +// The initial slice length will be 16384 (16kb). +// +// After use, the *[]byte should be put back into the sync.Pool +// by calling PutByteSlice. +func GetByteSlice() *[]byte { + buf := byteSlice.Get().(*[]byte) + return buf +} + +// PutByteSlice puts buf back into its sync.Pool. +func PutByteSlice(buf *[]byte) { + byteSlice.Put(buf) +} + +// GetBytesBuffer returns a *bytes.Buffer that is managed by a sync.Pool. +// Returns a buffer that is resetted and ready for use. +// +// After use, the *bytes.Buffer should be put back into the sync.Pool +// by calling PutBytesBuffer. +func GetBytesBuffer() *bytes.Buffer { + buf := bytesBuffer.Get().(*bytes.Buffer) + buf.Reset() + return buf +} + +// PutBytesBuffer puts buf back into its sync.Pool. +func PutBytesBuffer(buf *bytes.Buffer) { + bytesBuffer.Put(buf) +} diff --git a/utils/sync/bytes_test.go b/utils/sync/bytes_test.go new file mode 100644 index 000000000..b233429ce --- /dev/null +++ b/utils/sync/bytes_test.go @@ -0,0 +1,49 @@ +package sync + +import ( + "testing" +) + +func TestGetAndPutBytesBuffer(t *testing.T) { + buf := GetBytesBuffer() + if buf == nil { + t.Error("nil was not expected") + } + + initialLen := buf.Len() + buf.Grow(initialLen * 2) + grownLen := buf.Len() + + PutBytesBuffer(buf) + + buf = GetBytesBuffer() + if buf.Len() != grownLen { + t.Error("bytes buffer was not reused") + } + + buf2 := GetBytesBuffer() + if buf2.Len() != initialLen { + t.Errorf("new bytes buffer length: wanted %d got %d", initialLen, buf2.Len()) + } +} + +func TestGetAndPutByteSlice(t *testing.T) { + slice := GetByteSlice() + if slice == nil { + t.Error("nil was not expected") + } + + wanted := 16 * 1024 + got := len(*slice) + if wanted != got { + t.Errorf("byte slice length: wanted %d got %d", wanted, got) + } + + newByteSlice := make([]byte, wanted*2) + PutByteSlice(&newByteSlice) + + newSlice := GetByteSlice() + if len(*newSlice) != len(newByteSlice) { + t.Error("byte slice was not reused") + } +} diff --git a/utils/sync/zlib.go b/utils/sync/zlib.go new file mode 100644 index 000000000..c61388595 --- /dev/null +++ b/utils/sync/zlib.go @@ -0,0 +1,74 @@ +package sync + +import ( + "bytes" + "compress/zlib" + "io" + "sync" +) + +var ( + zlibInitBytes = []byte{0x78, 0x9c, 0x01, 0x00, 0x00, 0xff, 0xff, 0x00, 0x00, 0x00, 0x01} + zlibReader = sync.Pool{ + New: func() interface{} { + r, _ := zlib.NewReader(bytes.NewReader(zlibInitBytes)) + return ZLibReader{ + Reader: r.(zlibReadCloser), + } + }, + } + zlibWriter = sync.Pool{ + New: func() interface{} { + return zlib.NewWriter(nil) + }, + } +) + +type zlibReadCloser interface { + io.ReadCloser + zlib.Resetter +} + +type ZLibReader struct { + dict *[]byte + Reader zlibReadCloser +} + +// GetZlibReader returns a ZLibReader that is managed by a sync.Pool. +// Returns a ZLibReader that is resetted using a dictionary that is +// also managed by a sync.Pool. +// +// After use, the ZLibReader should be put back into the sync.Pool +// by calling PutZlibReader. +func GetZlibReader(r io.Reader) (ZLibReader, error) { + z := zlibReader.Get().(ZLibReader) + z.dict = GetByteSlice() + + err := z.Reader.Reset(r, *z.dict) + + return z, err +} + +// PutZlibReader puts z back into its sync.Pool, first closing the reader. +// The Byte slice dictionary is also put back into its sync.Pool. +func PutZlibReader(z ZLibReader) { + z.Reader.Close() + PutByteSlice(z.dict) + zlibReader.Put(z) +} + +// GetZlibWriter returns a *zlib.Writer that is managed by a sync.Pool. +// Returns a writer that is resetted with w and ready for use. +// +// After use, the *zlib.Writer should be put back into the sync.Pool +// by calling PutZlibWriter. +func GetZlibWriter(w io.Writer) *zlib.Writer { + z := zlibWriter.Get().(*zlib.Writer) + z.Reset(w) + return z +} + +// PutZlibWriter puts w back into its sync.Pool. +func PutZlibWriter(w *zlib.Writer) { + zlibWriter.Put(w) +} diff --git a/utils/sync/zlib_test.go b/utils/sync/zlib_test.go new file mode 100644 index 000000000..b736fb221 --- /dev/null +++ b/utils/sync/zlib_test.go @@ -0,0 +1,74 @@ +package sync + +import ( + "bytes" + "compress/zlib" + "io" + "testing" +) + +func TestGetAndPutZlibReader(t *testing.T) { + _, err := GetZlibReader(bytes.NewReader(zlibInitBytes)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + dict := &[]byte{} + reader := FakeZLibReader{} + PutZlibReader(ZLibReader{dict: dict, Reader: &reader}) + + if !reader.wasClosed { + t.Errorf("reader was not closed") + } + + z2, err := GetZlibReader(bytes.NewReader(zlibInitBytes)) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if dict != z2.dict { + t.Errorf("zlib dictionary was not reused") + } + + if &reader != z2.Reader { + t.Errorf("zlib reader was not reused") + } + + if !reader.wasReset { + t.Errorf("reader was not reset") + } +} + +func TestGetAndPutZlibWriter(t *testing.T) { + w := GetZlibWriter(nil) + if w == nil { + t.Errorf("nil was not expected") + } + + newW := zlib.NewWriter(nil) + PutZlibWriter(newW) + + w2 := GetZlibWriter(nil) + if w2 != newW { + t.Errorf("zlib writer was not reused") + } +} + +type FakeZLibReader struct { + wasClosed bool + wasReset bool +} + +func (f *FakeZLibReader) Reset(r io.Reader, dict []byte) error { + f.wasReset = true + return nil +} + +func (f *FakeZLibReader) Read(p []byte) (n int, err error) { + return 0, nil +} + +func (f *FakeZLibReader) Close() error { + f.wasClosed = true + return nil +} diff --git a/worktree.go b/worktree.go index 98116ca52..02f90a9e6 100644 --- a/worktree.go +++ b/worktree.go @@ -9,7 +9,6 @@ import ( "os" "path/filepath" "strings" - "sync" "github.com/go-git/go-billy/v5" "github.com/go-git/go-billy/v5/util" @@ -22,6 +21,7 @@ import ( "github.com/go-git/go-git/v5/plumbing/storer" "github.com/go-git/go-git/v5/utils/ioutil" "github.com/go-git/go-git/v5/utils/merkletrie" + "github.com/go-git/go-git/v5/utils/sync" ) var ( @@ -532,13 +532,6 @@ func (w *Worktree) checkoutChangeRegularFile(name string, return nil } -var copyBufferPool = sync.Pool{ - New: func() interface{} { - b := make([]byte, 32*1024) - return &b - }, -} - func (w *Worktree) checkoutFile(f *object.File) (err error) { mode, err := f.Mode.ToOSFileMode() if err != nil { @@ -562,10 +555,9 @@ func (w *Worktree) checkoutFile(f *object.File) (err error) { } defer ioutil.CheckClose(to, &err) - bufp := copyBufferPool.Get().(*[]byte) - buf := *bufp - _, err = io.CopyBuffer(to, from, buf) - copyBufferPool.Put(bufp) + buf := sync.GetByteSlice() + _, err = io.CopyBuffer(to, from, *buf) + sync.PutByteSlice(buf) return } From a2c309de872dc18053acb186b1ec125d1f723a90 Mon Sep 17 00:00:00 2001 From: Paulo Gomes Date: Fri, 4 Nov 2022 11:04:29 +0000 Subject: [PATCH 4/4] tests: Replace time.sleep with eventually The previous approach was intermittently flake, leading to different results based on external results. The check for goroutines numbers now checks for less or equal, as the goal of the assertion is to confirm no goroutine is being leaked. Signed-off-by: Paulo Gomes --- remote_test.go | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/remote_test.go b/remote_test.go index d0c8fa8e0..751c89a1b 100644 --- a/remote_test.go +++ b/remote_test.go @@ -535,10 +535,22 @@ func (s *RemoteSuite) TestPushContext(c *C) { }) c.Assert(err, IsNil) - // let the goroutine from pushHashes finish and check that the number of - // goroutines is the same as before - time.Sleep(100 * time.Millisecond) - c.Assert(runtime.NumGoroutine(), Equals, numGoroutines) + eventually(c, func() bool { + return runtime.NumGoroutine() <= numGoroutines + }) +} + +func eventually(c *C, condition func() bool) { + select { + case <-time.After(5 * time.Second): + default: + if condition() { + break + } + time.Sleep(100 * time.Millisecond) + } + + c.Assert(condition(), Equals, true) } func (s *RemoteSuite) TestPushContextCanceled(c *C) { @@ -566,10 +578,9 @@ func (s *RemoteSuite) TestPushContextCanceled(c *C) { }) c.Assert(err, Equals, context.Canceled) - // let the goroutine from pushHashes finish and check that the number of - // goroutines is the same as before - time.Sleep(100 * time.Millisecond) - c.Assert(runtime.NumGoroutine(), Equals, numGoroutines) + eventually(c, func() bool { + return runtime.NumGoroutine() <= numGoroutines + }) } func (s *RemoteSuite) TestPushTags(c *C) {