Skip to content

Commit

Permalink
plumbing: format/packfile, prevent large objects from being read into…
Browse files Browse the repository at this point in the history
… memory completely (#303)

This PR adds code to prevent large objects from being read into memory from packfiles or the filesystem.

Objects greater than 1Mb are now no longer directly stored in the cache
or read completely into memory.

Signed-off-by: Andrew Thornton <art27@cantab.net>
  • Loading branch information
zeripath committed May 12, 2021
1 parent e6e2339 commit 720c192
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 1 deletion.
15 changes: 15 additions & 0 deletions plumbing/format/packfile/fsobject.go
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/cache"
"github.com/go-git/go-git/v5/plumbing/format/idxfile"
"github.com/go-git/go-git/v5/utils/ioutil"
)

// FSObject is an object from the packfile on the filesystem.
Expand Down Expand Up @@ -63,6 +64,20 @@ func (o *FSObject) Reader() (io.ReadCloser, error) {
}

p := NewPackfileWithCache(o.index, nil, f, o.cache)
if o.size > LargeObjectThreshold {
// We have a big object
h, err := p.objectHeaderAtOffset(o.offset)
if err != nil {
return nil, err
}

r, err := p.getReaderDirect(h)
if err != nil {
_ = f.Close()
return nil, err
}
return ioutil.NewReadCloserWithCloser(r, f.Close), nil
}
r, err := p.getObjectContent(o.offset)
if err != nil {
_ = f.Close()
Expand Down
73 changes: 73 additions & 0 deletions plumbing/format/packfile/packfile.go
Expand Up @@ -32,6 +32,12 @@ var (
// wrapped in FSObject.
const smallObjectThreshold = 16 * 1024

// Conversely there are large objects that should not be cached and kept
// in memory as they're too large to be reasonably cached. Objects larger
// than this threshold are now always never read into memory to be stored
// in the cache
const LargeObjectThreshold = 1024 * 1024

// Packfile allows retrieving information from inside a packfile.
type Packfile struct {
idxfile.Index
Expand Down Expand Up @@ -282,6 +288,37 @@ func (p *Packfile) getObjectContent(offset int64) (io.ReadCloser, error) {
return obj.Reader()
}

func (p *Packfile) getReaderDirect(h *ObjectHeader) (io.ReadCloser, error) {
switch h.Type {
case plumbing.CommitObject, plumbing.TreeObject, plumbing.BlobObject, plumbing.TagObject:
return p.s.ReadObject()
case plumbing.REFDeltaObject:
deltaRC, err := p.s.ReadObject()
if err != nil {
return nil, err
}
r, err := p.readREFDeltaObjectContent(h, deltaRC)
if err != nil {
_ = deltaRC.Close()
return nil, err
}
return r, nil
case plumbing.OFSDeltaObject:
deltaRC, err := p.s.ReadObject()
if err != nil {
return nil, err
}
r, err := p.readOFSDeltaObjectContent(h, deltaRC)
if err != nil {
_ = deltaRC.Close()
return nil, err
}
return r, nil
default:
return nil, ErrInvalidObject.AddDetails("type %q", h.Type)
}
}

func (p *Packfile) getNextMemoryObject(h *ObjectHeader) (plumbing.EncodedObject, error) {
var obj = new(plumbing.MemoryObject)
obj.SetSize(h.Length)
Expand Down Expand Up @@ -334,6 +371,20 @@ func (p *Packfile) fillREFDeltaObjectContent(obj plumbing.EncodedObject, ref plu
return p.fillREFDeltaObjectContentWithBuffer(obj, ref, buf)
}

func (p *Packfile) readREFDeltaObjectContent(h *ObjectHeader, deltaRC io.ReadCloser) (io.ReadCloser, error) {
var err error

base, ok := p.cacheGet(h.Reference)
if !ok {
base, err = p.Get(h.Reference)
if err != nil {
return nil, err
}
}

return ReaderFromDelta(h, base, deltaRC)
}

func (p *Packfile) fillREFDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, ref plumbing.Hash, buf *bytes.Buffer) error {
var err error

Expand Down Expand Up @@ -364,6 +415,28 @@ func (p *Packfile) fillOFSDeltaObjectContent(obj plumbing.EncodedObject, offset
return p.fillOFSDeltaObjectContentWithBuffer(obj, offset, buf)
}

func (p *Packfile) readOFSDeltaObjectContent(h *ObjectHeader, deltaRC io.ReadCloser) (io.ReadCloser, error) {
hash, err := p.FindHash(h.OffsetReference)
if err != nil {
return nil, err
}

base, err := p.objectAtOffset(h.OffsetReference, hash)
if err != nil {
return nil, err
}

base, ok := p.cacheGet(h.Reference)
if !ok {
base, err = p.Get(h.Reference)
if err != nil {
return nil, err
}
}

return ReaderFromDelta(h, base, deltaRC)
}

func (p *Packfile) fillOFSDeltaObjectContentWithBuffer(obj plumbing.EncodedObject, offset int64, buf *bytes.Buffer) error {
hash, err := p.FindHash(offset)
if err != nil {
Expand Down
210 changes: 210 additions & 0 deletions plumbing/format/packfile/patch_delta.go
@@ -1,9 +1,11 @@
package packfile

import (
"bufio"
"bytes"
"errors"
"io"
"math"

"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/utils/ioutil"
Expand Down Expand Up @@ -73,6 +75,131 @@ func PatchDelta(src, delta []byte) ([]byte, error) {
return b.Bytes(), nil
}

func ReaderFromDelta(h *ObjectHeader, base plumbing.EncodedObject, deltaRC io.ReadCloser) (io.ReadCloser, error) {
deltaBuf := bufio.NewReaderSize(deltaRC, 1024)
srcSz, err := decodeLEB128ByteReader(deltaBuf)
if err != nil {
if err == io.EOF {
return nil, ErrInvalidDelta
}
return nil, err
}
if srcSz != uint(base.Size()) {
return nil, ErrInvalidDelta
}

targetSz, err := decodeLEB128ByteReader(deltaBuf)
if err != nil {
if err == io.EOF {
return nil, ErrInvalidDelta
}
return nil, err
}
remainingTargetSz := targetSz

dstRd, dstWr := io.Pipe()

go func() {
baseRd, err := base.Reader()
if err != nil {
_ = dstWr.CloseWithError(ErrInvalidDelta)
return
}
defer baseRd.Close()

baseBuf := bufio.NewReader(baseRd)
basePos := uint(0)

for {
cmd, err := deltaBuf.ReadByte()
if err == io.EOF {
_ = dstWr.CloseWithError(ErrInvalidDelta)
return
}
if err != nil {
_ = dstWr.CloseWithError(err)
return
}

if isCopyFromSrc(cmd) {
offset, err := decodeOffsetByteReader(cmd, deltaBuf)
if err != nil {
_ = dstWr.CloseWithError(err)
return
}
sz, err := decodeSizeByteReader(cmd, deltaBuf)
if err != nil {
_ = dstWr.CloseWithError(err)
return
}

if invalidSize(sz, targetSz) ||
invalidOffsetSize(offset, sz, srcSz) {
_ = dstWr.Close()
return
}

discard := offset - basePos
if discard < 0 {
_ = baseRd.Close()
baseRd, err = base.Reader()
if err != nil {
_ = dstWr.CloseWithError(ErrInvalidDelta)
return
}
baseBuf.Reset(baseRd)
discard = offset
}
for discard > math.MaxInt32 {
n, err := baseBuf.Discard(math.MaxInt32)
if err != nil {
_ = dstWr.CloseWithError(err)
return
}
basePos += uint(n)
discard -= uint(n)
}
for discard > 0 {
n, err := baseBuf.Discard(int(discard))
if err != nil {
_ = dstWr.CloseWithError(err)
return
}
basePos += uint(n)
discard -= uint(n)
}
if _, err := io.Copy(dstWr, io.LimitReader(baseBuf, int64(sz))); err != nil {
_ = dstWr.CloseWithError(err)
return
}
remainingTargetSz -= sz
basePos += sz
} else if isCopyFromDelta(cmd) {
sz := uint(cmd) // cmd is the size itself
if invalidSize(sz, targetSz) {
_ = dstWr.CloseWithError(ErrInvalidDelta)
return
}
if _, err := io.Copy(dstWr, io.LimitReader(deltaBuf, int64(sz))); err != nil {
_ = dstWr.CloseWithError(err)
return
}

remainingTargetSz -= sz
} else {
_ = dstWr.CloseWithError(ErrDeltaCmd)
return
}
if remainingTargetSz <= 0 {
_ = dstWr.Close()
return
}
}
}()

return dstRd, nil
}

func patchDelta(dst *bytes.Buffer, src, delta []byte) error {
if len(delta) < deltaSizeMin {
return ErrInvalidDelta
Expand Down Expand Up @@ -161,6 +288,25 @@ func decodeLEB128(input []byte) (uint, []byte) {
return num, input[sz:]
}

func decodeLEB128ByteReader(input io.ByteReader) (uint, error) {
var num, sz uint
for {
b, err := input.ReadByte()
if err != nil {
return 0, err
}

num |= (uint(b) & payload) << (sz * 7) // concats 7 bits chunks
sz++

if uint(b)&continuation == 0 {
break
}
}

return num, nil
}

const (
payload = 0x7f // 0111 1111
continuation = 0x80 // 1000 0000
Expand All @@ -174,6 +320,40 @@ func isCopyFromDelta(cmd byte) bool {
return (cmd&0x80) == 0 && cmd != 0
}

func decodeOffsetByteReader(cmd byte, delta io.ByteReader) (uint, error) {
var offset uint
if (cmd & 0x01) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
offset = uint(next)
}
if (cmd & 0x02) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
offset |= uint(next) << 8
}
if (cmd & 0x04) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
offset |= uint(next) << 16
}
if (cmd & 0x08) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
offset |= uint(next) << 24
}

return offset, nil
}

func decodeOffset(cmd byte, delta []byte) (uint, []byte, error) {
var offset uint
if (cmd & 0x01) != 0 {
Expand Down Expand Up @@ -208,6 +388,36 @@ func decodeOffset(cmd byte, delta []byte) (uint, []byte, error) {
return offset, delta, nil
}

func decodeSizeByteReader(cmd byte, delta io.ByteReader) (uint, error) {
var sz uint
if (cmd & 0x10) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
sz = uint(next)
}
if (cmd & 0x20) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
sz |= uint(next) << 8
}
if (cmd & 0x40) != 0 {
next, err := delta.ReadByte()
if err != nil {
return 0, err
}
sz |= uint(next) << 16
}
if sz == 0 {
sz = 0x10000
}

return sz, nil
}

func decodeSize(cmd byte, delta []byte) (uint, []byte, error) {
var sz uint
if (cmd & 0x10) != 0 {
Expand Down
15 changes: 15 additions & 0 deletions plumbing/format/packfile/scanner.go
Expand Up @@ -320,6 +320,21 @@ func (s *Scanner) NextObject(w io.Writer) (written int64, crc32 uint32, err erro
return
}

// ReadObject returns a reader for the object content and an error
func (s *Scanner) ReadObject() (io.ReadCloser, error) {
s.pendingObject = nil
zr := zlibReaderPool.Get().(io.ReadCloser)

if err := zr.(zlib.Resetter).Reset(s.r, nil); err != nil {
return nil, fmt.Errorf("zlib reset error: %s", err)
}

return ioutil.NewReadCloserWithCloser(zr, func() error {
zlibReaderPool.Put(zr)
return nil
}), nil
}

// ReadRegularObject reads and write a non-deltified object
// from it zlib stream in an object entry in the packfile.
func (s *Scanner) copyObject(w io.Writer) (n int64, err error) {
Expand Down

0 comments on commit 720c192

Please sign in to comment.