Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a 'skip' parameter to writev1 so that the beginning of a car can … #291

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
9 changes: 7 additions & 2 deletions v2/internal/io/offset_write_seeker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package io

import "io"
import (
"errors"
"io"
)

var ErrUnsupported = errors.New("unsupported seek operation")

var (
_ io.Writer = (*OffsetWriteSeeker)(nil)
Expand Down Expand Up @@ -30,7 +35,7 @@ func (ow *OffsetWriteSeeker) Seek(offset int64, whence int) (int64, error) {
case io.SeekCurrent:
ow.offset += offset
case io.SeekEnd:
panic("unsupported whence: SeekEnd")
return 0, ErrUnsupported
}
return ow.Position(), nil
}
Expand Down
115 changes: 115 additions & 0 deletions v2/internal/io/skip_writer_read_seeker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package io

import (
"context"
"errors"
"fmt"
"io"
)

// SkipWriterReaderSeeker wraps a factory producing a writer with a ReadSeeker implementation.
// Note that Read and Seek are not thread-safe, they must not be called
// concurrently.
type SkipWriterReaderSeeker struct {
parentCtx context.Context
offset uint64
size uint64

cons ReWriter
reader *io.PipeReader
writeCancel context.CancelFunc
writeComplete chan struct{}
}

// ReWriter is a function writing to an io.Writer from an offset.
type ReWriter func(ctx context.Context, skip uint64, w io.Writer) (uint64, error)

var _ io.ReadSeeker = (*SkipWriterReaderSeeker)(nil)
var _ io.Closer = (*SkipWriterReaderSeeker)(nil)

// NewSkipWriterReaderSeeker creates an io.ReadSeeker around a ReWriter.
func NewSkipWriterReaderSeeker(ctx context.Context, size uint64, cons ReWriter) *SkipWriterReaderSeeker {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have a better name than cons? it's not obvious to me what this name is supposed to be (consume?), it's puzzling when we get to c.cons() and it seems like it should be more obvious what it's doing - should it be read as "start consuming now"?

return &SkipWriterReaderSeeker{
parentCtx: ctx,
size: size,
cons: cons,
writeComplete: make(chan struct{}, 1),
}
}

// Note: not threadsafe
func (c *SkipWriterReaderSeeker) Read(p []byte) (int, error) {
// Check if there's already a write in progress
if c.reader == nil {
// No write in progress, start a new write from the current offset
// in a go routine and feed it back to the caller via a pipe
writeCtx, writeCancel := context.WithCancel(c.parentCtx)
c.writeCancel = writeCancel
pr, pw := io.Pipe()
c.reader = pr

go func() {
amnt, err := c.cons(writeCtx, c.offset, pw)
c.offset += amnt
if err != nil && !errors.Is(err, context.Canceled) {
pw.CloseWithError(err)
} else {
pw.Close()
}
c.writeComplete <- struct{}{}
}()
}

return c.reader.Read(p)
}

// Note: not threadsafe
func (c *SkipWriterReaderSeeker) Seek(offset int64, whence int) (int64, error) {
// Update the offset
switch whence {
case io.SeekStart:
if offset < 0 {
return 0, fmt.Errorf("invalid offset %d from start: must be positive", offset)
}
c.offset = uint64(offset)
case io.SeekCurrent:
if int64(c.offset)+offset < 0 {
return 0, fmt.Errorf("invalid offset %d from current %d: resulting offset is negative", offset, c.offset)
}
c.offset = uint64((int64(c.offset) + offset))
case io.SeekEnd:
if c.size == 0 {
return 0, ErrUnsupported

}
if int64(c.size)+offset < 0 {
return 0, fmt.Errorf("invalid offset %d from end: larger than total size %d", offset, c.size)
}
c.offset = uint64(int64(c.size) + offset)
}

// Cancel any ongoing write and wait for it to complete
// TODO: if we're fast-forwarding with 'SeekCurrent', we may be able to read from the current reader instead.
if c.reader != nil {
err := c.Close()
c.reader = nil
if err != nil {
return 0, err
}
}

return int64(c.offset), nil
}

func (c *SkipWriterReaderSeeker) Close() error {
c.writeCancel()
// Seek and Read should not be called concurrently so this is safe
c.reader.Close()

select {
case <-c.parentCtx.Done():
return c.parentCtx.Err()
case <-c.writeComplete:
}
return nil
}
18 changes: 9 additions & 9 deletions v2/internal/loader/counting_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"github.com/multiformats/go-varint"
)

// counter tracks how much data has been read.
type counter struct {
totalRead uint64
// Counter tracks how much data has been read.
type Counter struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we use a struct for this? is it something to do with needing a pointer to it? why isn't a *uint64 good enough for this purpose?

TotalRead uint64
}

func (c *counter) Size() uint64 {
return c.totalRead
func (c *Counter) Size() uint64 {
return c.TotalRead
}

// ReadCounter provides an externally consumable interface to the
Expand All @@ -26,12 +26,12 @@ type ReadCounter interface {

type countingReader struct {
r io.Reader
c *counter
c *Counter
}

func (c *countingReader) Read(p []byte) (int, error) {
n, err := c.r.Read(p)
c.c.totalRead += uint64(n)
c.c.TotalRead += uint64(n)
return n, err
}

Expand All @@ -41,7 +41,7 @@ func (c *countingReader) Read(p []byte) (int, error) {
// appear in a CAR file is added to the counter (included the size of the
// CID and the varint length for the block data).
func CountingLinkSystem(ls ipld.LinkSystem) (ipld.LinkSystem, ReadCounter) {
c := counter{}
c := Counter{}
clc := ls
clc.StorageReadOpener = func(lc linking.LinkContext, l ipld.Link) (io.Reader, error) {
r, err := ls.StorageReadOpener(lc, l)
Expand All @@ -54,7 +54,7 @@ func CountingLinkSystem(ls ipld.LinkSystem) (ipld.LinkSystem, ReadCounter) {
return nil, err
}
size := varint.ToUvarint(uint64(n) + uint64(len(l.Binary())))
c.totalRead += uint64(len(size)) + uint64(len(l.Binary()))
c.TotalRead += uint64(len(size)) + uint64(len(l.Binary()))
return &countingReader{buf, &c}, nil
}
return clc, &c
Expand Down
114 changes: 81 additions & 33 deletions v2/internal/loader/writing_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,25 @@ import (
"github.com/multiformats/go-varint"
)

type writerOutput struct {
w io.Writer
size uint64
code multicodec.Code
rcrds map[cid.Cid]index.Record
// indexingWriter wraps an io.Writer with metadata of the index of the car written to it.
type indexingWriter struct {
w io.Writer
size uint64
toSkip uint64
code multicodec.Code
rcrds map[cid.Cid]index.Record
}

func (w *writerOutput) Size() uint64 {
func (w *indexingWriter) Size() uint64 {
return w.size
}

func (w *writerOutput) Index() (index.Index, error) {
func (w *indexingWriter) Index() (index.Index, error) {
idx, err := index.New(w.code)
if err != nil {
return nil, err
}
// todo: maybe keep both a map and a list proactively for efficiency here.
rcrds := make([]index.Record, 0, len(w.rcrds))
for _, r := range w.rcrds {
rcrds = append(rcrds, r)
Expand All @@ -46,38 +49,86 @@ type IndexTracker interface {
Index() (index.Index, error)
}

var _ IndexTracker = (*indexingWriter)(nil)

// writingReader is used on a per-block basis for the TeeingLinkSystem's StorageReadOpener, we use it
// to intercept block reads and construct CAR section output for that block, passing that section data to
// indexingWriter, while also passing the plain binary block data back to the LinkSystem caller (which
// we expect to be a traversal operation).
// Additionally, if we are performing a "skip" of initial bytes for this CAR, we track the byte count as we
// construct the CAR section data and decide when and how much to write out to the indexingWriter.
// Skip operations don't impact the downstream LinkSystem user (traversal), but they do impact what's
// written out via the indexingWriter.
type writingReader struct {
rvagg marked this conversation as resolved.
Show resolved Hide resolved
r io.Reader
len int64
buf []byte
cid string
wo *writerOutput
iw *indexingWriter
}

func (w *writingReader) Read(p []byte) (int, error) {
if w.wo != nil {
if w.iw != nil {
// build the buffer of size:cid:block if we don't have it yet.
buf := bytes.NewBuffer(nil)
// allocate space for size
_, err := buf.Write(make([]byte, varint.MaxLenUvarint63))
if err != nil {
return 0, err
}
// write the cid
size := varint.ToUvarint(uint64(w.len) + uint64(len(w.cid)))
if _, err := w.wo.w.Write(size); err != nil {
if _, err := buf.Write([]byte(w.cid)); err != nil {
return 0, err
}
if _, err := w.wo.w.Write([]byte(w.cid)); err != nil {
// write the block
n, err := io.Copy(buf, w.r)
if err != nil {
return 0, err
}
cpy := bytes.NewBuffer(w.r.(*bytes.Buffer).Bytes())
if _, err := cpy.WriteTo(w.wo.w); err != nil {
// write the varint size prefix and trim the unneeded prefix padding we allocated
sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid)))
rvagg marked this conversation as resolved.
Show resolved Hide resolved
writeBuf := buf.Bytes()[varint.MaxLenUvarint63-len(sizeBytes):]
w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid):]
_ = copy(writeBuf[:], sizeBytes)

size := len(writeBuf)
// indexingWriter manages state for a skip operation, but we have to mutate it here -
// if there are still bytes to skip, then we either need to skip over this whole block, or pass
// part of it on, and then update the toSkip state
if w.iw.toSkip > 0 {
if w.iw.toSkip >= uint64(len(writeBuf)) {
w.iw.toSkip -= uint64(len(writeBuf))
// will cause the WriteTo() below to be a noop, we need to skip this entire block
writeBuf = []byte{}
rvagg marked this conversation as resolved.
Show resolved Hide resolved
} else {
writeBuf = writeBuf[w.iw.toSkip:]
w.iw.toSkip = 0
}
}

if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.iw.w); err != nil {
return 0, err
}
_, c, err := cid.CidFromBytes([]byte(w.cid))
if err != nil {
return 0, err
}
w.wo.rcrds[c] = index.Record{
w.iw.rcrds[c] = index.Record{
Cid: c,
Offset: w.wo.size,
Offset: w.iw.size,
}
w.wo.size += uint64(w.len) + uint64(len(size)+len(w.cid))
w.iw.size += uint64(size)
w.iw = nil
}

w.wo = nil
if w.buf != nil {
rvagg marked this conversation as resolved.
Show resolved Hide resolved
// we've already read the block from the parent reader for writing the CAR block section (above),
// so we need to pass those bytes on in whatever chunk size the caller wants
n, err := bytes.NewBuffer(w.buf).Read(p)
if err != nil {
return n, err
}
w.buf = w.buf[n:]
return n, err
}

return w.r.Read(p)
Expand All @@ -89,12 +140,13 @@ func (w *writingReader) Read(p []byte) (int, error) {
// The `initialOffset` is used to calculate the offsets recorded for the index, and will be
// included in the `.Size()` of the IndexTracker.
// An indexCodec of `index.CarIndexNoIndex` can be used to not track these offsets.
func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, indexCodec multicodec.Code) (ipld.LinkSystem, IndexTracker) {
wo := writerOutput{
w: w,
size: initialOffset,
code: indexCodec,
rcrds: make(map[cid.Cid]index.Record),
func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, skip uint64, indexCodec multicodec.Code) (ipld.LinkSystem, IndexTracker) {
iw := indexingWriter{
w: w,
size: initialOffset,
toSkip: skip,
code: indexCodec,
rcrds: make(map[cid.Cid]index.Record),
}

tls := ls
Expand All @@ -105,20 +157,16 @@ func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, ind
}

// if we've already read this cid in this session, don't re-write it.
if _, ok := wo.rcrds[c]; ok {
if _, ok := iw.rcrds[c]; ok {
return ls.StorageReadOpener(lc, l)
}

r, err := ls.StorageReadOpener(lc, l)
if err != nil {
return nil, err
}
buf := bytes.NewBuffer(nil)
n, err := buf.ReadFrom(r)
if err != nil {
return nil, err
}
return &writingReader{buf, n, l.Binary(), &wo}, nil

return &writingReader{r, nil, l.Binary(), &iw}, nil
}
return tls, &wo
return tls, &iw
}
10 changes: 10 additions & 0 deletions v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ type Options struct {
MaxTraversalLinks uint64
WriteAsCarV1 bool
TraversalPrototypeChooser traversal.LinkTargetNodePrototypeChooser
DataPayloadSize uint64
SkipOffset uint64

MaxAllowedHeaderSize uint64
MaxAllowedSectionSize uint64
Expand Down Expand Up @@ -97,6 +99,14 @@ func ZeroLengthSectionAsEOF(enable bool) Option {
}
}

// WithSkipOffset sets the start offset we should seek to the in the traversal
// when writing out a CAR. This option only applies to the selective and traversal writer.
func WithSkipOffset(skip uint64) Option {
return func(o *Options) {
o.SkipOffset = skip
}
}

// UseDataPadding sets the padding to be added between CARv2 header and its data payload on Finalize.
func UseDataPadding(p uint64) Option {
return func(o *Options) {
Expand Down