Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plumbing: format/packfile, prevent large objects from being read into memory completely #330

Merged
merged 6 commits into from Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 51 additions & 0 deletions plumbing/format/packfile/delta_test.go
@@ -1,8 +1,11 @@
package packfile

import (
"bytes"
"io/ioutil"
"math/rand"

"github.com/go-git/go-git/v5/plumbing"
. "gopkg.in/check.v1"
)

Expand Down Expand Up @@ -97,6 +100,32 @@ func (s *DeltaSuite) TestAddDelta(c *C) {
}
}

func (s *DeltaSuite) TestAddDeltaReader(c *C) {
for _, t := range s.testCases {
baseBuf := genBytes(t.base)
baseObj := &plumbing.MemoryObject{}
baseObj.Write(baseBuf)

targetBuf := genBytes(t.target)

delta := DiffDelta(baseBuf, targetBuf)
deltaRC := ioutil.NopCloser(bytes.NewReader(delta))

c.Log("Executing test case:", t.description)

resultRC, err := ReaderFromDelta(baseObj, deltaRC)
c.Assert(err, IsNil)

result, err := ioutil.ReadAll(resultRC)
c.Assert(err, IsNil)

err = resultRC.Close()
c.Assert(err, IsNil)

c.Assert(result, DeepEquals, targetBuf)
}
}

func (s *DeltaSuite) TestIncompleteDelta(c *C) {
for _, t := range s.testCases {
c.Log("Incomplete delta on:", t.description)
Expand Down Expand Up @@ -125,3 +154,25 @@ func (s *DeltaSuite) TestMaxCopySizeDelta(c *C) {
c.Assert(err, IsNil)
c.Assert(result, DeepEquals, targetBuf)
}

func (s *DeltaSuite) TestMaxCopySizeDeltaReader(c *C) {
baseBuf := randBytes(maxCopySize)
baseObj := &plumbing.MemoryObject{}
baseObj.Write(baseBuf)

targetBuf := baseBuf[0:]
targetBuf = append(targetBuf, byte(1))

delta := DiffDelta(baseBuf, targetBuf)
deltaRC := ioutil.NopCloser(bytes.NewReader(delta))

resultRC, err := ReaderFromDelta(baseObj, deltaRC)
c.Assert(err, IsNil)

result, err := ioutil.ReadAll(resultRC)
c.Assert(err, IsNil)

err = resultRC.Close()
c.Assert(err, IsNil)
c.Assert(result, DeepEquals, targetBuf)
}
15 changes: 15 additions & 0 deletions plumbing/format/packfile/fsobject.go
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/cache"
"github.com/go-git/go-git/v5/plumbing/format/idxfile"
"github.com/go-git/go-git/v5/utils/ioutil"
)

// FSObject is an object from the packfile on the filesystem.
Expand Down Expand Up @@ -63,6 +64,20 @@ func (o *FSObject) Reader() (io.ReadCloser, error) {
}

p := NewPackfileWithCache(o.index, nil, f, o.cache)
if o.size > LargeObjectThreshold {
// We have a big object
h, err := p.objectHeaderAtOffset(o.offset)
if err != nil {
return nil, err
}

r, err := p.getReaderDirect(h)
if err != nil {
_ = f.Close()
return nil, err
}
return ioutil.NewReadCloserWithCloser(r, f.Close), nil
}
r, err := p.getObjectContent(o.offset)
if err != nil {
_ = f.Close()
Expand Down
80 changes: 80 additions & 0 deletions plumbing/format/packfile/packfile.go
Expand Up @@ -2,6 +2,8 @@ package packfile

import (
"bytes"
"compress/zlib"
"fmt"
"io"
"os"

Expand Down Expand Up @@ -32,6 +34,12 @@ var (
// wrapped in FSObject.
const smallObjectThreshold = 16 * 1024

// Conversely there are large objects that should not be cached and kept
// in memory as they're too large to be reasonably cached. Objects larger
// than this threshold are now always never read into memory to be stored
// in the cache
const LargeObjectThreshold = 1024 * 1024

// Packfile allows retrieving information from inside a packfile.
type Packfile struct {
idxfile.Index
Expand Down Expand Up @@ -282,6 +290,50 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) {
return obj.Reader()
}

func asyncReader(p *Packfile) (io.ReadCloser, error) {
reader := ioutil.NewReaderUsingReaderAt(p.file, p.s.r.offset)
zr := zlibReaderPool.Get().(io.ReadCloser)

if err := zr.(zlib.Resetter).Reset(reader, nil); err != nil {
return nil, fmt.Errorf("zlib reset error: %s", err)
}

return ioutil.NewReadCloserWithCloser(zr, func() error {
zlibReaderPool.Put(zr)
return nil
}), nil

}

func (p *Packfile) getReaderDirect(h *ObjectHeader) (io.ReadCloser, error) {
switch h.Type {
case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject:
return asyncReader(p)
case plumbing.REFDeltaObject:
deltaRc, err := asyncReader(p)
if err != nil {
return nil, err
}
r, err := p.readREFDeltaObjectContent(h, deltaRc)
if err != nil {
return nil, err
}
return r, nil
case plumbing.OFSDeltaObject:
deltaRc, err := asyncReader(p)
Copy link
Contributor Author

@zeripath zeripath Jun 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR differs from the previous #303 here by wrapping the reader in a reader at - so as to prevent problems from the scanner moving on etc.

Copy link
Contributor Author

@zeripath zeripath Jun 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was also a bug here in the original #303 that this fixes.

if err != nil {
return nil, err
}
r, err := p.readOFSDeltaObjectContent(h, deltaRc)
if err != nil {
return nil, err
}
return r, nil
default:
return nil, ErrInvalidObject.AddDetails("type %q", h.Type)
}
}

func (p *Packfile) getNextMemoryObject(h *ObjectHeader) (plumbing.EncodedObject, error) {
var obj = new(plumbing.MemoryObject)
obj.SetSize(h.Length)
Expand Down Expand Up @@ -334,6 +386,20 @@ func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plu
return p.fillREFDeltaObjectContentWithBuffer(obj, ref, buf)
}

func (p *Packfile) readREFDeltaObjectContent(h *ObjectHeader, deltaRC io.Reader) (io.ReadCloser, error) {
var err error

base, ok := p.cacheGet(h.Reference)
if !ok {
base, err = p.Get(h.Reference)
if err != nil {
return nil, err
}
}

return ReaderFromDelta(base, deltaRC)
}

func (p *Packfile) fillREFDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, ref plumbing.Hash, buf *bytes.Buffer) error {
var err error

Expand Down Expand Up @@ -364,6 +430,20 @@ func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset
return p.fillOFSDeltaObjectContentWithBuffer(obj, offset, buf)
}

func (p *Packfile) readOFSDeltaObjectContent(h *ObjectHeader, deltaRC io.Reader) (io.ReadCloser, error) {
hash, err := p.FindHash(h.OffsetReference)
if err != nil {
return nil, err
}

base, err := p.objectAtOffset(h.OffsetReference, hash)
if err != nil {
return nil, err
}

return ReaderFromDelta(base, deltaRC)
}

func (p *Packfile) fillOFSDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, offset int64, buf *bytes.Buffer) error {
hash, err := p.FindHash(offset)
if err != nil {
Expand Down