Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plumbing: format/packfile, prevent large objects from being read into memory completely #330

Merged
merged 6 commits into from Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 51 additions & 0 deletions plumbing/format/packfile/delta_test.go
@@ -1,8 +1,11 @@
package packfile

import (
"bytes"
"io/ioutil"
"math/rand"

"github.com/go-git/go-git/v5/plumbing"
. "gopkg.in/check.v1"
)

Expand Down Expand Up @@ -97,6 +100,32 @@ func (s *DeltaSuite) TestAddDelta(c *C) {
}
}

func (s *DeltaSuite) TestAddDeltaReader(c *C) {
for _, t := range s.testCases {
baseBuf := genBytes(t.base)
baseObj := &plumbing.MemoryObject{}
baseObj.Write(baseBuf)

targetBuf := genBytes(t.target)

delta := DiffDelta(baseBuf, targetBuf)
deltaRC := ioutil.NopCloser(bytes.NewReader(delta))

c.Log("Executing test case:", t.description)

resultRC, err := ReaderFromDelta(baseObj, deltaRC)
c.Assert(err, IsNil)

result, err := ioutil.ReadAll(resultRC)
c.Assert(err, IsNil)

err = resultRC.Close()
c.Assert(err, IsNil)

c.Assert(result, DeepEquals, targetBuf)
}
}

func (s *DeltaSuite) TestIncompleteDelta(c *C) {
for _, t := range s.testCases {
c.Log("Incomplete delta on:", t.description)
Expand Down Expand Up @@ -125,3 +154,25 @@ func (s *DeltaSuite) TestMaxCopySizeDelta(c *C) {
c.Assert(err, IsNil)
c.Assert(result, DeepEquals, targetBuf)
}

func (s *DeltaSuite) TestMaxCopySizeDeltaReader(c *C) {
baseBuf := randBytes(maxCopySize)
baseObj := &plumbing.MemoryObject{}
baseObj.Write(baseBuf)

targetBuf := baseBuf[0:]
targetBuf = append(targetBuf, byte(1))

delta := DiffDelta(baseBuf, targetBuf)
deltaRC := ioutil.NopCloser(bytes.NewReader(delta))

resultRC, err := ReaderFromDelta(baseObj, deltaRC)
c.Assert(err, IsNil)

result, err := ioutil.ReadAll(resultRC)
c.Assert(err, IsNil)

err = resultRC.Close()
c.Assert(err, IsNil)
c.Assert(result, DeepEquals, targetBuf)
}
2 changes: 1 addition & 1 deletion plumbing/format/packfile/encoder_advanced_test.go
Expand Up @@ -105,7 +105,7 @@ func (s *EncoderAdvancedSuite) testEncodeDecode(
_, err = f.Seek(0, io.SeekStart)
c.Assert(err, IsNil)

p := NewPackfile(index, fs, f)
p := NewPackfile(index, fs, f, 0)

decodeHash, err := p.ID()
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion plumbing/format/packfile/encoder_test.go
Expand Up @@ -318,7 +318,7 @@ func packfileFromReader(c *C, buf *bytes.Buffer) (*Packfile, func()) {
index, err := w.Index()
c.Assert(err, IsNil)

return NewPackfile(index, fs, file), func() {
return NewPackfile(index, fs, file, 0), func() {
c.Assert(file.Close(), IsNil)
}
}
54 changes: 36 additions & 18 deletions plumbing/format/packfile/fsobject.go
Expand Up @@ -7,19 +7,21 @@ import (
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/cache"
"github.com/go-git/go-git/v5/plumbing/format/idxfile"
"github.com/go-git/go-git/v5/utils/ioutil"
)

// FSObject is an object from the packfile on the filesystem.
type FSObject struct {
hash plumbing.Hash
h *ObjectHeader
offset int64
size int64
typ plumbing.ObjectType
index idxfile.Index
fs billy.Filesystem
path string
cache cache.Object
hash plumbing.Hash
h *ObjectHeader
offset int64
size int64
typ plumbing.ObjectType
index idxfile.Index
fs billy.Filesystem
path string
cache cache.Object
largeObjectThreshold int64
}

// NewFSObject creates a new filesystem object.
Expand All @@ -32,16 +34,18 @@ func NewFSObject(
fs billy.Filesystem,
path string,
cache cache.Object,
largeObjectThreshold int64,
) *FSObject {
return &FSObject{
hash: hash,
offset: offset,
size: contentSize,
typ: finalType,
index: index,
fs: fs,
path: path,
cache: cache,
hash: hash,
offset: offset,
size: contentSize,
typ: finalType,
index: index,
fs: fs,
path: path,
cache: cache,
largeObjectThreshold: largeObjectThreshold,
}
}

Expand All @@ -62,7 +66,21 @@ func (o *FSObject) Reader() (io.ReadCloser, error) {
return nil, err
}

p := NewPackfileWithCache(o.index, nil, f, o.cache)
p := NewPackfileWithCache(o.index, nil, f, o.cache, o.largeObjectThreshold)
if o.largeObjectThreshold > 0 && o.size > o.largeObjectThreshold {
// We have a big object
h, err := p.objectHeaderAtOffset(o.offset)
if err != nil {
return nil, err
}

r, err := p.getReaderDirect(h)
if err != nil {
_ = f.Close()
return nil, err
}
return ioutil.NewReadCloserWithCloser(r, f.Close), nil
}
r, err := p.getObjectContent(o.offset)
if err != nil {
_ = f.Close()
Expand Down
92 changes: 85 additions & 7 deletions plumbing/format/packfile/packfile.go
Expand Up @@ -2,6 +2,8 @@ package packfile

import (
"bytes"
"compress/zlib"
"fmt"
"io"
"os"

Expand Down Expand Up @@ -35,11 +37,12 @@ const smallObjectThreshold = 16 * 1024
// Packfile allows retrieving information from inside a packfile.
type Packfile struct {
idxfile.Index
fs billy.Filesystem
file billy.File
s *Scanner
deltaBaseCache cache.Object
offsetToType map[int64]plumbing.ObjectType
fs billy.Filesystem
file billy.File
s *Scanner
deltaBaseCache cache.Object
offsetToType map[int64]plumbing.ObjectType
largeObjectThreshold int64
}

// NewPackfileWithCache creates a new Packfile with the given object cache.
Expand All @@ -50,6 +53,7 @@ func NewPackfileWithCache(
fs billy.Filesystem,
file billy.File,
cache cache.Object,
largeObjectThreshold int64,
) *Packfile {
s := NewScanner(file)
return &Packfile{
Expand All @@ -59,15 +63,16 @@ func NewPackfileWithCache(
s,
cache,
make(map[int64]plumbing.ObjectType),
largeObjectThreshold,
}
}

// NewPackfile returns a packfile representation for the given packfile file
// and packfile idx.
// If the filesystem is provided, the packfile will return FSObjects, otherwise
// it will return MemoryObjects.
func NewPackfile(index idxfile.Index, fs billy.Filesystem, file billy.File) *Packfile {
return NewPackfileWithCache(index, fs, file, cache.NewObjectLRUDefault())
func NewPackfile(index idxfile.Index, fs billy.Filesystem, file billy.File, largeObjectThreshold int64) *Packfile {
return NewPackfileWithCache(index, fs, file, cache.NewObjectLRUDefault(), largeObjectThreshold)
}

// Get retrieves the encoded object in the packfile with the given hash.
Expand Down Expand Up @@ -263,6 +268,7 @@ func (p *Packfile) getNextObject(h *ObjectHeader, hash plumbing.Hash) (plumbing.
p.fs,
p.file.Name(),
p.deltaBaseCache,
p.largeObjectThreshold,
), nil
}

Expand All @@ -282,6 +288,50 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) {
return obj.Reader()
}

func asyncReader(p *Packfile) (io.ReadCloser, error) {
reader := ioutil.NewReaderUsingReaderAt(p.file, p.s.r.offset)
zr := zlibReaderPool.Get().(io.ReadCloser)

if err := zr.(zlib.Resetter).Reset(reader, nil); err != nil {
return nil, fmt.Errorf("zlib reset error: %s", err)
}

return ioutil.NewReadCloserWithCloser(zr, func() error {
zlibReaderPool.Put(zr)
return nil
}), nil

}

func (p *Packfile) getReaderDirect(h *ObjectHeader) (io.ReadCloser, error) {
switch h.Type {
case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject:
return asyncReader(p)
case plumbing.REFDeltaObject:
deltaRc, err := asyncReader(p)
if err != nil {
return nil, err
}
r, err := p.readREFDeltaObjectContent(h, deltaRc)
if err != nil {
return nil, err
}
return r, nil
case plumbing.OFSDeltaObject:
deltaRc, err := asyncReader(p)
Copy link
Contributor Author

@zeripath zeripath Jun 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR differs from the previous #303 here by wrapping the reader in a reader at - so as to prevent problems from the scanner moving on etc.

Copy link
Contributor Author

@zeripath zeripath Jun 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was also a bug here in the original #303 that this fixes.

if err != nil {
return nil, err
}
r, err := p.readOFSDeltaObjectContent(h, deltaRc)
if err != nil {
return nil, err
}
return r, nil
default:
return nil, ErrInvalidObject.AddDetails("type %q", h.Type)
}
}

func (p *Packfile) getNextMemoryObject(h *ObjectHeader) (plumbing.EncodedObject, error) {
var obj = new(plumbing.MemoryObject)
obj.SetSize(h.Length)
Expand Down Expand Up @@ -334,6 +384,20 @@ func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plu
return p.fillREFDeltaObjectContentWithBuffer(obj, ref, buf)
}

func (p *Packfile) readREFDeltaObjectContent(h *ObjectHeader, deltaRC io.Reader) (io.ReadCloser, error) {
var err error

base, ok := p.cacheGet(h.Reference)
if !ok {
base, err = p.Get(h.Reference)
if err != nil {
return nil, err
}
}

return ReaderFromDelta(base, deltaRC)
}

func (p *Packfile) fillREFDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, ref plumbing.Hash, buf *bytes.Buffer) error {
var err error

Expand Down Expand Up @@ -364,6 +428,20 @@ func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset
return p.fillOFSDeltaObjectContentWithBuffer(obj, offset, buf)
}

func (p *Packfile) readOFSDeltaObjectContent(h *ObjectHeader, deltaRC io.Reader) (io.ReadCloser, error) {
hash, err := p.FindHash(h.OffsetReference)
if err != nil {
return nil, err
}

base, err := p.objectAtOffset(h.OffsetReference, hash)
if err != nil {
return nil, err
}

return ReaderFromDelta(base, deltaRC)
}

func (p *Packfile) fillOFSDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, offset int64, buf *bytes.Buffer) error {
hash, err := p.FindHash(offset)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions plumbing/format/packfile/packfile_test.go
Expand Up @@ -111,7 +111,7 @@ func (s *PackfileSuite) SetUpTest(c *C) {
s.idx = idxfile.NewMemoryIndex()
c.Assert(idxfile.NewDecoder(s.f.Idx()).Decode(s.idx), IsNil)

s.p = packfile.NewPackfile(s.idx, fixtures.Filesystem, s.f.Packfile())
s.p = packfile.NewPackfile(s.idx, fixtures.Filesystem, s.f.Packfile(), 0)
}

func (s *PackfileSuite) TearDownTest(c *C) {
Expand All @@ -122,7 +122,7 @@ func (s *PackfileSuite) TestDecode(c *C) {
fixtures.Basic().ByTag("packfile").Test(c, func(f *fixtures.Fixture) {
index := getIndexFromIdxFile(f.Idx())

p := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile())
p := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0)
defer p.Close()

for _, h := range expectedHashes {
Expand All @@ -138,7 +138,7 @@ func (s *PackfileSuite) TestDecodeByTypeRefDelta(c *C) {

index := getIndexFromIdxFile(f.Idx())

packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile())
packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0)
defer packfile.Close()

iter, err := packfile.GetByType(plumbing.CommitObject)
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *PackfileSuite) TestDecodeByType(c *C) {
for _, t := range ts {
index := getIndexFromIdxFile(f.Idx())

packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile())
packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0)
defer packfile.Close()

iter, err := packfile.GetByType(t)
Expand All @@ -189,7 +189,7 @@ func (s *PackfileSuite) TestDecodeByTypeConstructor(c *C) {
f := fixtures.Basic().ByTag("packfile").One()
index := getIndexFromIdxFile(f.Idx())

packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile())
packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0)
defer packfile.Close()

_, err := packfile.GetByType(plumbing.OFSDeltaObject)
Expand Down Expand Up @@ -266,7 +266,7 @@ func (s *PackfileSuite) TestSize(c *C) {

index := getIndexFromIdxFile(f.Idx())

packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile())
packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0)
defer packfile.Close()

// Get the size of binary.jpg, which is not delta-encoded.
Expand Down