Skip to content

Commit

Permalink
fix: minor doc fixes, updates and improvements from review
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Aug 12, 2022
1 parent 926d766 commit 0eda172
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 4 deletions.
2 changes: 1 addition & 1 deletion v2/internal/io/skip_writer_read_seeker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *SkipWriterReaderSeeker) Read(p []byte) (int, error) {
// Check if there's already a write in progress
if c.reader == nil {
// No write in progress, start a new write from the current offset
// in a go routine
// in a go routine and feed it back to the caller via a pipe
writeCtx, writeCancel := context.WithCancel(c.parentCtx)
c.writeCancel = writeCancel
pr, pw := io.Pipe()
Expand Down
17 changes: 16 additions & 1 deletion v2/internal/loader/writing_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,19 @@ type IndexTracker interface {

var _ IndexTracker = (*indexingWriter)(nil)

// writingReader is used on a per-block basis for the TeeingLinkSystem's StorageReadOpener, we use it
// to intercept block reads and construct CAR section output for that block, passing that section data to
// indexingWriter, while also passing the plain binary block data back to the LinkSystem caller (which
// we expect to be a traversal operation).
// Additionally, if we are performing a "skip" of initial bytes for this CAR, we track the byte count as we
// construct the CAR section data and decide when and how much to write out to the indexingWriter.
// Skip operations don't impact the downstream LinkSystem user (traversal), but they do impact what's
// written out via the indexingWriter.
type writingReader struct {
r io.Reader
buf []byte
cid string
wo *indexingWriter
iw *indexingWriter
}

func (w *writingReader) Read(p []byte) (int, error) {
Expand All @@ -76,15 +84,20 @@ func (w *writingReader) Read(p []byte) (int, error) {
if err != nil {
return 0, err
}
// write the varint size prefix and trim the unneeded prefix padding we allocated
sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid)))
writeBuf := buf.Bytes()[varint.MaxLenUvarint63-len(sizeBytes):]
w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid):]
_ = copy(writeBuf[:], sizeBytes)

size := len(writeBuf)
// indexingWriter manages state for a skip operation, but we have to mutate it here -
// if there are still bytes to skip, then we either need to skip over this whole block, or pass
// part of it on, and then update the toSkip state
if w.wo.toSkip > 0 {
if w.wo.toSkip >= uint64(len(writeBuf)) {
w.wo.toSkip -= uint64(len(writeBuf))
// will cause the WriteTo() below to be a noop, we need to skip this entire block
writeBuf = []byte{}
} else {
writeBuf = writeBuf[w.wo.toSkip:]
Expand All @@ -108,6 +121,8 @@ func (w *writingReader) Read(p []byte) (int, error) {
}

if w.buf != nil {
// we've already read the block from the parent reader for writing the CAR block section (above),
// so we need to pass those bytes on in whatever chunk size the caller wants
n, err := bytes.NewBuffer(w.buf).Read(p)
if err != nil {
return n, err
Expand Down
14 changes: 12 additions & 2 deletions v2/selective.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func MaxTraversalLinks(MaxTraversalLinks uint64) Option {
}

// WithDataPayloadSize sets the expected v1 size of the car being written if it is known in advance.
// This is required if NewCarV1StreamReader() is used and a Seek() operation needs seek back from
// SeekEnd (i.e. if we don't know where the end is, we can't figure out how far that is from the start).
// It can also be used to validate the expected size of a CAR's data payload if it's known in advance. In
// that case, a selective CAR creation operation will return an ErrSizeMismatch if the actual size doesn't
// match the expected set with this option.
func WithDataPayloadSize(size uint64) Option {
return func(sco *Options) {
sco.DataPayloadSize = size
Expand Down Expand Up @@ -143,7 +148,7 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector
}

// NewCarV1StreamReader creates an io.ReadSeeker that can be used to copy out the carv1 contents of a car.
func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) {
func NewSelectiveV1Reader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) {
opts = append(opts, WithoutIndex())
conf := ApplyOptions(opts...)
tc := traversalCar{
Expand All @@ -155,6 +160,11 @@ func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid
opts: conf,
}
rwf := func(ctx context.Context, offset uint64, writer io.Writer) (uint64, error) {
// it's only at this point we have the `offset` to start writing at since the user of the
// ReadSeeker will (may) have called Seek() and we've worked out where in the CAR
// that is as an offset. Now we can start writing the CARv1 data, which will be passed
// on to the ReadSeeker.
// Note that we're inside a goroutine here
s, _, err := tc.WriteV1(ctx, offset, writer)
return s, err
}
Expand Down Expand Up @@ -275,7 +285,7 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) (
skip -= v1Size
}

// write the block.
// write the blocks
wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec)
if err = tc.setup(ctx, &wls, tc.opts); err != nil {
return v1Size, nil, err
Expand Down

0 comments on commit 0eda172

Please sign in to comment.