Skip to content

Commit

Permalink
plumbing: format/packfile, prevent large objects from being read into…
Browse files Browse the repository at this point in the history
… memory completely (go-git#330)

This PR adds code to prevent large objects from being read into memory
from packfiles or the filesystem.

Objects greater than 1Mb are now no longer directly stored in the cache
or read completely into memory.

This PR differs and improves the previous broken go-git#323 by fixing several
bugs in the reader and transparently wrapping ReaderAt as a Reader.

Signed-off-by: Andrew Thornton <art27@cantab.net>
  • Loading branch information
zeripath authored and harry-hov committed Oct 5, 2022
1 parent b2efd9b commit 2523dcc
Show file tree
Hide file tree
Showing 13 changed files with 601 additions and 43 deletions.
51 changes: 51 additions & 0 deletions plumbing/format/packfile/delta_test.go
@@ -1,8 +1,11 @@
package packfile

import (
"bytes"
"io/ioutil"
"math/rand"

"github.com/go-git/go-git/v5/plumbing"
. "gopkg.in/check.v1"
)

Expand Down Expand Up @@ -97,6 +100,32 @@ func (s *DeltaSuite) TestAddDelta(c *C) {
}
}

func (s *DeltaSuite) TestAddDeltaReader(c *C) {
for _, t := range s.testCases {
baseBuf := genBytes(t.base)
baseObj := &plumbing.MemoryObject{}
baseObj.Write(baseBuf)

targetBuf := genBytes(t.target)

delta := DiffDelta(baseBuf, targetBuf)
deltaRC := ioutil.NopCloser(bytes.NewReader(delta))

c.Log("Executing test case:", t.description)

resultRC, err := ReaderFromDelta(baseObj, deltaRC)
c.Assert(err, IsNil)

result, err := ioutil.ReadAll(resultRC)
c.Assert(err, IsNil)

err = resultRC.Close()
c.Assert(err, IsNil)

c.Assert(result, DeepEquals, targetBuf)
}
}

func (s *DeltaSuite) TestIncompleteDelta(c *C) {
for _, t := range s.testCases {
c.Log("Incomplete delta on:", t.description)
Expand Down Expand Up @@ -125,3 +154,25 @@ func (s *DeltaSuite) TestMaxCopySizeDelta(c *C) {
c.Assert(err, IsNil)
c.Assert(result, DeepEquals, targetBuf)
}

func (s *DeltaSuite) TestMaxCopySizeDeltaReader(c *C) {
baseBuf := randBytes(maxCopySize)
baseObj := &plumbing.MemoryObject{}
baseObj.Write(baseBuf)

targetBuf := baseBuf[0:]
targetBuf = append(targetBuf, byte(1))

delta := DiffDelta(baseBuf, targetBuf)
deltaRC := ioutil.NopCloser(bytes.NewReader(delta))

resultRC, err := ReaderFromDelta(baseObj, deltaRC)
c.Assert(err, IsNil)

result, err := ioutil.ReadAll(resultRC)
c.Assert(err, IsNil)

err = resultRC.Close()
c.Assert(err, IsNil)
c.Assert(result, DeepEquals, targetBuf)
}
2 changes: 1 addition & 1 deletion plumbing/format/packfile/encoder_advanced_test.go
Expand Up @@ -105,7 +105,7 @@ func (s *EncoderAdvancedSuite) testEncodeDecode(
_, err = f.Seek(0, io.SeekStart)
c.Assert(err, IsNil)

p := NewPackfile(index, fs, f)
p := NewPackfile(index, fs, f, 0)

decodeHash, err := p.ID()
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion plumbing/format/packfile/encoder_test.go
Expand Up @@ -318,7 +318,7 @@ func packfileFromReader(c *C, buf *bytes.Buffer) (*Packfile, func()) {
index, err := w.Index()
c.Assert(err, IsNil)

return NewPackfile(index, fs, file), func() {
return NewPackfile(index, fs, file, 0), func() {
c.Assert(file.Close(), IsNil)
}
}
54 changes: 36 additions & 18 deletions plumbing/format/packfile/fsobject.go
Expand Up @@ -6,20 +6,22 @@ import (
"github.com/gitopia/go-git/v5/plumbing"
"github.com/gitopia/go-git/v5/plumbing/cache"
"github.com/gitopia/go-git/v5/plumbing/format/idxfile"
"github.com/gitopia/go-git/v5/utils/ioutil"
billy "github.com/go-git/go-billy/v5"
)

// FSObject is an object from the packfile on the filesystem.
type FSObject struct {
hash plumbing.Hash
h *ObjectHeader
offset int64
size int64
typ plumbing.ObjectType
index idxfile.Index
fs billy.Filesystem
path string
cache cache.Object
hash plumbing.Hash
h *ObjectHeader
offset int64
size int64
typ plumbing.ObjectType
index idxfile.Index
fs billy.Filesystem
path string
cache cache.Object
largeObjectThreshold int64
}

// NewFSObject creates a new filesystem object.
Expand All @@ -32,16 +34,18 @@ func NewFSObject(
fs billy.Filesystem,
path string,
cache cache.Object,
largeObjectThreshold int64,
) *FSObject {
return &FSObject{
hash: hash,
offset: offset,
size: contentSize,
typ: finalType,
index: index,
fs: fs,
path: path,
cache: cache,
hash: hash,
offset: offset,
size: contentSize,
typ: finalType,
index: index,
fs: fs,
path: path,
cache: cache,
largeObjectThreshold: largeObjectThreshold,
}
}

Expand All @@ -62,7 +66,21 @@ func (o *FSObject) Reader() (io.ReadCloser, error) {
return nil, err
}

p := NewPackfileWithCache(o.index, nil, f, o.cache)
p := NewPackfileWithCache(o.index, nil, f, o.cache, o.largeObjectThreshold)
if o.largeObjectThreshold > 0 && o.size > o.largeObjectThreshold {
// We have a big object
h, err := p.objectHeaderAtOffset(o.offset)
if err != nil {
return nil, err
}

r, err := p.getReaderDirect(h)
if err != nil {
_ = f.Close()
return nil, err
}
return ioutil.NewReadCloserWithCloser(r, f.Close), nil
}
r, err := p.getObjectContent(o.offset)
if err != nil {
_ = f.Close()
Expand Down
92 changes: 85 additions & 7 deletions plumbing/format/packfile/packfile.go
Expand Up @@ -2,6 +2,8 @@ package packfile

import (
"bytes"
"compress/zlib"
"fmt"
"io"
"os"

Expand Down Expand Up @@ -35,11 +37,12 @@ const smallObjectThreshold = 16 * 1024
// Packfile allows retrieving information from inside a packfile.
type Packfile struct {
idxfile.Index
fs billy.Filesystem
file billy.File
s *Scanner
deltaBaseCache cache.Object
offsetToType map[int64]plumbing.ObjectType
fs billy.Filesystem
file billy.File
s *Scanner
deltaBaseCache cache.Object
offsetToType map[int64]plumbing.ObjectType
largeObjectThreshold int64
}

// NewPackfileWithCache creates a new Packfile with the given object cache.
Expand All @@ -50,6 +53,7 @@ func NewPackfileWithCache(
fs billy.Filesystem,
file billy.File,
cache cache.Object,
largeObjectThreshold int64,
) *Packfile {
s := NewScanner(file)
return &Packfile{
Expand All @@ -59,15 +63,16 @@ func NewPackfileWithCache(
s,
cache,
make(map[int64]plumbing.ObjectType),
largeObjectThreshold,
}
}

// NewPackfile returns a packfile representation for the given packfile file
// and packfile idx.
// If the filesystem is provided, the packfile will return FSObjects, otherwise
// it will return MemoryObjects.
func NewPackfile(index idxfile.Index, fs billy.Filesystem, file billy.File) *Packfile {
return NewPackfileWithCache(index, fs, file, cache.NewObjectLRUDefault())
func NewPackfile(index idxfile.Index, fs billy.Filesystem, file billy.File, largeObjectThreshold int64) *Packfile {
return NewPackfileWithCache(index, fs, file, cache.NewObjectLRUDefault(), largeObjectThreshold)
}

// Get retrieves the encoded object in the packfile with the given hash.
Expand Down Expand Up @@ -263,6 +268,7 @@ func (p *Packfile) getNextObject(h *ObjectHeader, hash plumbing.Hash) (plumbing.
p.fs,
p.file.Name(),
p.deltaBaseCache,
p.largeObjectThreshold,
), nil
}

Expand All @@ -282,6 +288,50 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) {
return obj.Reader()
}

func asyncReader(p *Packfile) (io.ReadCloser, error) {
reader := ioutil.NewReaderUsingReaderAt(p.file, p.s.r.offset)
zr := zlibReaderPool.Get().(io.ReadCloser)

if err := zr.(zlib.Resetter).Reset(reader, nil); err != nil {
return nil, fmt.Errorf("zlib reset error: %s", err)
}

return ioutil.NewReadCloserWithCloser(zr, func() error {
zlibReaderPool.Put(zr)
return nil
}), nil

}

func (p *Packfile) getReaderDirect(h *ObjectHeader) (io.ReadCloser, error) {
switch h.Type {
case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject:
return asyncReader(p)
case plumbing.REFDeltaObject:
deltaRc, err := asyncReader(p)
if err != nil {
return nil, err
}
r, err := p.readREFDeltaObjectContent(h, deltaRc)
if err != nil {
return nil, err
}
return r, nil
case plumbing.OFSDeltaObject:
deltaRc, err := asyncReader(p)
if err != nil {
return nil, err
}
r, err := p.readOFSDeltaObjectContent(h, deltaRc)
if err != nil {
return nil, err
}
return r, nil
default:
return nil, ErrInvalidObject.AddDetails("type %q", h.Type)
}
}

func (p *Packfile) getNextMemoryObject(h *ObjectHeader) (plumbing.EncodedObject, error) {
var obj = new(plumbing.MemoryObject)
obj.SetSize(h.Length)
Expand Down Expand Up @@ -334,6 +384,20 @@ func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plu
return p.fillREFDeltaObjectContentWithBuffer(obj, ref, buf)
}

func (p *Packfile) readREFDeltaObjectContent(h *ObjectHeader, deltaRC io.Reader) (io.ReadCloser, error) {
var err error

base, ok := p.cacheGet(h.Reference)
if !ok {
base, err = p.Get(h.Reference)
if err != nil {
return nil, err
}
}

return ReaderFromDelta(base, deltaRC)
}

func (p *Packfile) fillREFDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, ref plumbing.Hash, buf *bytes.Buffer) error {
var err error

Expand Down Expand Up @@ -364,6 +428,20 @@ func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset
return p.fillOFSDeltaObjectContentWithBuffer(obj, offset, buf)
}

func (p *Packfile) readOFSDeltaObjectContent(h *ObjectHeader, deltaRC io.Reader) (io.ReadCloser, error) {
hash, err := p.FindHash(h.OffsetReference)
if err != nil {
return nil, err
}

base, err := p.objectAtOffset(h.OffsetReference, hash)
if err != nil {
return nil, err
}

return ReaderFromDelta(base, deltaRC)
}

func (p *Packfile) fillOFSDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, offset int64, buf *bytes.Buffer) error {
hash, err := p.FindHash(offset)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions plumbing/format/packfile/packfile_test.go
Expand Up @@ -111,7 +111,7 @@ func (s *PackfileSuite) SetUpTest(c *C) {
s.idx = idxfile.NewMemoryIndex()
c.Assert(idxfile.NewDecoder(s.f.Idx()).Decode(s.idx), IsNil)

s.p = packfile.NewPackfile(s.idx, fixtures.Filesystem, s.f.Packfile())
s.p = packfile.NewPackfile(s.idx, fixtures.Filesystem, s.f.Packfile(), 0)
}

func (s *PackfileSuite) TearDownTest(c *C) {
Expand All @@ -122,7 +122,7 @@ func (s *PackfileSuite) TestDecode(c *C) {
fixtures.Basic().ByTag("packfile").Test(c, func(f *fixtures.Fixture) {
index := getIndexFromIdxFile(f.Idx())

p := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile())
p := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0)
defer p.Close()

for _, h := range expectedHashes {
Expand All @@ -138,7 +138,7 @@ func (s *PackfileSuite) TestDecodeByTypeRefDelta(c *C) {

index := getIndexFromIdxFile(f.Idx())

packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile())
packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0)
defer packfile.Close()

iter, err := packfile.GetByType(plumbing.CommitObject)
Expand Down Expand Up @@ -171,7 +171,7 @@ func (s *PackfileSuite) TestDecodeByType(c *C) {
for _, t := range ts {
index := getIndexFromIdxFile(f.Idx())

packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile())
packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0)
defer packfile.Close()

iter, err := packfile.GetByType(t)
Expand All @@ -189,7 +189,7 @@ func (s *PackfileSuite) TestDecodeByTypeConstructor(c *C) {
f := fixtures.Basic().ByTag("packfile").One()
index := getIndexFromIdxFile(f.Idx())

packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile())
packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0)
defer packfile.Close()

_, err := packfile.GetByType(plumbing.OFSDeltaObject)
Expand Down Expand Up @@ -266,7 +266,7 @@ func (s *PackfileSuite) TestSize(c *C) {

index := getIndexFromIdxFile(f.Idx())

packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile())
packfile := packfile.NewPackfile(index, fixtures.Filesystem, f.Packfile(), 0)
defer packfile.Close()

// Get the size of binary.jpg, which is not delta-encoded.
Expand Down

0 comments on commit 2523dcc

Please sign in to comment.