Skip to content

Commit

Permalink
feat: fix BlockReader#SkipNext & add SourceOffset property
Browse files Browse the repository at this point in the history
* SkipNext's metadata calculation wasn't working properly in the case of a
  CARv2 as it didn't take into account the V1 header in the original offset
  calculation.
* Add a SourceOffset and some docs to be absolutely clear what offsets we're
  returning in the metadata.

Ref: filecoin-project/boost#1651
  • Loading branch information
rvagg committed Sep 1, 2023
1 parent 72edbd2 commit 09d23db
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 36 deletions.
93 changes: 57 additions & 36 deletions v2/block_reader.go
@@ -1,6 +1,7 @@
package car

import (
"errors"
"fmt"
"io"

Expand All @@ -23,6 +24,7 @@ type BlockReader struct {
// Used internally only, by BlockReader.Next during iteration over blocks.
r io.Reader
offset uint64
v1offset uint64
readerSize int64
opts Options
}
Expand Down Expand Up @@ -80,7 +82,8 @@ func NewBlockReader(r io.Reader, opts ...Option) (*BlockReader, error) {
if _, err := rs.Seek(int64(v2h.DataOffset)-PragmaSize-HeaderSize, io.SeekCurrent); err != nil {
return nil, err
}
br.offset = uint64(v2h.DataOffset)
br.v1offset = uint64(v2h.DataOffset)
br.offset = br.v1offset
br.readerSize = int64(v2h.DataOffset + v2h.DataSize)

// Set br.r to a LimitReader reading from r limited to dataSize.
Expand All @@ -96,6 +99,8 @@ func NewBlockReader(r io.Reader, opts ...Option) (*BlockReader, error) {
return nil, fmt.Errorf("invalid data payload header version; expected 1, got %v", header.Version)
}
br.Roots = header.Roots
hs, _ := carv1.HeaderSize(header)
br.offset += hs
default:
// Otherwise, error out with invalid version since only versions 1 or 2 are expected.
return nil, fmt.Errorf("invalid car version: %d", br.Version)
Expand Down Expand Up @@ -136,10 +141,22 @@ func (br *BlockReader) Next() (blocks.Block, error) {
return blocks.NewBlockWithCid(data, c)
}

// BlockMetadata contains metadata about a block's section in a CAR file/stream.
//
// There are two offsets for the block data which will be the same if the
// original CAR is a CARv1, but will differ if the original CAR is a CARv2. In
// the case of a CARv2, SourceOffset will be the offset from the beginning of
// the file/steam, and Offset will be the offset from the beginning of the CARv1
// payload container within the CARv2.
//
// Offset is useful for index generation which requires an offset from the CARv1
// payload; while SourceOffset is useful for direct block reads out of the
// source file/stream regardless of version.
type BlockMetadata struct {
cid.Cid
Offset uint64
Size uint64
Offset uint64 // Offset of the block data in the container CARv1
SourceOffset uint64 // SourceOffset is the offset of block data in the source file/stream
Size uint64
}

// SkipNext jumps over the next block, returning metadata about what it is (the CID, offset, and size).
Expand All @@ -148,24 +165,33 @@ type BlockMetadata struct {
// If the underlying reader used by the BlockReader is actually a ReadSeeker, this method will attempt to
// seek over the underlying data rather than reading it into memory.
func (br *BlockReader) SkipNext() (*BlockMetadata, error) {
sctSize, err := util.LdReadSize(br.r, br.opts.ZeroLengthSectionAsEOF, br.opts.MaxAllowedSectionSize)
sectionSize, err := util.LdReadSize(br.r, br.opts.ZeroLengthSectionAsEOF, br.opts.MaxAllowedSectionSize)
if err != nil {
return nil, err
}

if sctSize == 0 {
_, _, err := cid.CidFromBytes([]byte{})
if sectionSize == 0 {
_, _, err := cid.CidFromBytes([]byte{}) // generate zero-byte CID error
if err == nil {
panic("expected zero-byte CID error")
}
return nil, err
}

cidSize, c, err := cid.CidFromReader(io.LimitReader(br.r, int64(sctSize)))
lenSize := uint64(varint.UvarintSize(sectionSize))

cidSize, c, err := cid.CidFromReader(io.LimitReader(br.r, int64(sectionSize)))
if err != nil {
return nil, err
}

blkSize := sctSize - uint64(cidSize)
blockSize := sectionSize - uint64(cidSize)
blockOffset := br.offset + lenSize + uint64(cidSize)

// move our reader forward; either by seeking or slurping

if brs, ok := br.r.(io.ReadSeeker); ok {
// carv1 and we don't know the size, so work it out and cache it
// carv1 and we don't know the size, so work it out and cache it so we
// can use it to determine over-reads
if br.readerSize == -1 {
cur, err := brs.Seek(0, io.SeekCurrent)
if err != nil {
Expand All @@ -180,42 +206,37 @@ func (br *BlockReader) SkipNext() (*BlockMetadata, error) {
return nil, err
}
}
// seek.
finalOffset, err := brs.Seek(int64(blkSize), io.SeekCurrent)

// seek forward past the block data
finalOffset, err := brs.Seek(int64(blockSize), io.SeekCurrent)
if err != nil {
return nil, err
}
if finalOffset != int64(br.offset)+int64(sctSize)+int64(varint.UvarintSize(sctSize)) {
return nil, fmt.Errorf("unexpected length")
if finalOffset != int64(br.offset)+int64(lenSize)+int64(sectionSize) {
return nil, errors.New("unexpected length")
}
if finalOffset > br.readerSize {
return nil, io.ErrUnexpectedEOF
}
br.offset = uint64(finalOffset)
return &BlockMetadata{
c,
uint64(finalOffset) - sctSize - uint64(varint.UvarintSize(sctSize)),
blkSize,
}, nil
}

// read to end.
readCnt, err := io.CopyN(io.Discard, br.r, int64(blkSize))
if err != nil {
if err == io.EOF {
return nil, io.ErrUnexpectedEOF
} else { // just a reader, we need to slurp the block bytes
readCnt, err := io.CopyN(io.Discard, br.r, int64(blockSize))
if err != nil {
if err == io.EOF {
return nil, io.ErrUnexpectedEOF
}
return nil, err
}
if readCnt != int64(blockSize) {
return nil, errors.New("unexpected length")
}
return nil, err
}
if readCnt != int64(blkSize) {
return nil, fmt.Errorf("unexpected length")
}
origOffset := br.offset
br.offset += uint64(varint.UvarintSize(sctSize)) + sctSize

br.offset = blockOffset + blockSize

return &BlockMetadata{
c,
origOffset,
blkSize,
Cid: c,
Offset: blockOffset - br.v1offset,
SourceOffset: blockOffset,
Size: blockSize,
}, nil
}
149 changes: 149 additions & 0 deletions v2/block_reader_test.go
Expand Up @@ -2,12 +2,14 @@ package car_test

import (
"bytes"
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"os"
"testing"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/internal/carv1"
Expand Down Expand Up @@ -229,6 +231,153 @@ func TestMaxHeaderLength(t *testing.T) {
require.EqualError(t, err, "invalid header data, length of read beyond allowable maximum")
}

func TestBlockReader(t *testing.T) {
req := require.New(t)

// prepare a CARv1 with 100 blocks
roots := []cid.Cid{cid.MustParse("bafyrgqhai26anf3i7pips7q22coa4sz2fr4gk4q4sqdtymvvjyginfzaqewveaeqdh524nsktaq43j65v22xxrybrtertmcfxufdam3da3hbk")}
blks := make([]struct {
block blocks.Block
dataOffset uint64
}, 100)
v1buf := new(bytes.Buffer)
carv1.WriteHeader(&carv1.CarHeader{Roots: roots, Version: 1}, v1buf)
vb := make([]byte, 2)
for i := 0; i < 100; i++ {
blk := randBlock(100 + i) // we should cross the varint two-byte boundary in here somewhere
vn := varint.PutUvarint(vb, uint64(len(blk.Cid().Bytes())+len(blk.RawData())))
n, err := v1buf.Write(vb[:vn])
req.NoError(err)
req.Equal(n, vn)
n, err = v1buf.Write(blk.Cid().Bytes())
req.NoError(err)
req.Equal(len(blk.Cid().Bytes()), n)
blks[i] = struct {
block blocks.Block
dataOffset uint64
}{block: blk, dataOffset: uint64(v1buf.Len())}
n, err = v1buf.Write(blk.RawData())
req.NoError(err)
req.Equal(len(blk.RawData()), n)
}

v2buf := new(bytes.Buffer)
n, err := v2buf.Write(carv2.Pragma)
req.NoError(err)
req.Equal(len(carv2.Pragma), n)
v2Header := carv2.NewHeader(uint64(v1buf.Len()))
ni, err := v2Header.WriteTo(v2buf)
req.NoError(err)
req.Equal(carv2.HeaderSize, int(ni))
n, err = v2buf.Write(v1buf.Bytes())
req.NoError(err)
req.Equal(v1buf.Len(), n)

v2padbuf := new(bytes.Buffer)
n, err = v2padbuf.Write(carv2.Pragma)
req.NoError(err)
req.Equal(len(carv2.Pragma), n)
v2Header = carv2.NewHeader(uint64(v1buf.Len()))
// pad with 100 bytes
v2Header.DataOffset += 100
ni, err = v2Header.WriteTo(v2padbuf)
req.NoError(err)
req.Equal(carv2.HeaderSize, int(ni))
v2padbuf.Write(make([]byte, 100))
n, err = v2padbuf.Write(v1buf.Bytes())
req.NoError(err)
req.Equal(v1buf.Len(), n)

for _, testCase := range []struct {
name string
reader func() io.Reader
v1offset uint64
}{
{
name: "v1",
reader: func() io.Reader { return &readerOnly{bytes.NewReader(v1buf.Bytes())} },
},
{
name: "v2",
reader: func() io.Reader { return &readerOnly{bytes.NewReader(v2buf.Bytes())} },
v1offset: uint64(carv2.PragmaSize + carv2.HeaderSize),
},
{
name: "v2 padded",
reader: func() io.Reader { return &readerOnly{bytes.NewReader(v2padbuf.Bytes())} },
v1offset: uint64(carv2.PragmaSize+carv2.HeaderSize) + 100,
},
{
name: "v1 w/ReadSeeker",
reader: func() io.Reader { return bytes.NewReader(v1buf.Bytes()) },
},
{
name: "v2 w/ReadSeeker",
reader: func() io.Reader { return bytes.NewReader(v2buf.Bytes()) },
v1offset: uint64(carv2.PragmaSize + carv2.HeaderSize),
},
{
name: "v2 padded w/ReadSeeker",
reader: func() io.Reader { return bytes.NewReader(v2padbuf.Bytes()) },
v1offset: uint64(carv2.PragmaSize+carv2.HeaderSize) + 100,
},
} {
t.Run(testCase.name, func(t *testing.T) {
req := require.New(t)

car, err := carv2.NewBlockReader(testCase.reader())
req.NoError(err)
req.ElementsMatch(roots, car.Roots)

for i := 0; i < 100; i++ {
blk, err := car.Next()
req.NoError(err)
req.Equal(blks[i].block.Cid(), blk.Cid())
req.Equal(blks[i].block.RawData(), blk.RawData())
}
_, err = car.Next()
req.ErrorIs(err, io.EOF)

car, err = carv2.NewBlockReader(testCase.reader())
req.NoError(err)
req.ElementsMatch(roots, car.Roots)

for i := 0; i < 100; i++ {
blk, err := car.SkipNext()
req.NoError(err)
req.Equal(blks[i].block.Cid(), blk.Cid)
req.Equal(uint64(len(blks[i].block.RawData())), blk.Size)
req.Equal(blks[i].dataOffset, blk.Offset, "block #%d", i)
req.Equal(blks[i].dataOffset+testCase.v1offset, blk.SourceOffset)
}
_, err = car.Next()
req.ErrorIs(err, io.EOF)
})
}
}

type readerOnly struct {
r io.Reader
}

func (r readerOnly) Read(b []byte) (int, error) {
return r.r.Read(b)
}

func randBlock(l int) blocks.Block {
data := make([]byte, l)
rand.Read(data)
h, err := mh.Sum(data, mh.SHA2_512, -1)
if err != nil {
panic(err)
}
blk, err := blocks.NewBlockWithCid(data, cid.NewCidV1(cid.Raw, h))
if err != nil {
panic(err)
}
return blk
}

func requireReaderFromPath(t *testing.T, path string) io.Reader {
f, err := os.Open(path)
require.NoError(t, err)
Expand Down

0 comments on commit 09d23db

Please sign in to comment.