Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extract TraversalResumerPathState interface and allow it to be shared across traversals #327

Draft
wants to merge 3 commits into
base: resumecarwrite
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 43 additions & 25 deletions v2/internal/loader/writing_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,19 @@ import (
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2/index"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
"github.com/ipld/go-ipld-prime/linking"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-varint"
)

// copy of traversal.PathState
type PathState interface {
AddPath(path []datamodel.PathSegment, link datamodel.Link, atOffset uint64)
GetLinks(root datamodel.Path) []datamodel.Link
GetOffsetAfter(root datamodel.Path) (uint64, error)
}

// indexingWriter wraps an io.Writer with metadata of the index of the car written to it.
type indexingWriter struct {
w io.Writer
Expand Down Expand Up @@ -54,7 +62,7 @@ var _ IndexTracker = (*indexingWriter)(nil)
type writingReader struct {
r io.Reader
buf []byte
cid string
cid cid.Cid
wo *indexingWriter
}

Expand All @@ -68,42 +76,43 @@ func (w *writingReader) Read(p []byte) (int, error) {
return 0, err
}
// write the cid
if _, err := buf.Write([]byte(w.cid)); err != nil {
if _, err := buf.Write(w.cid.Bytes()); err != nil {
return 0, err
}
// write the block
n, err := io.Copy(buf, w.r)
if err != nil {
return 0, err
}
sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid)))
sizeBytes := varint.ToUvarint(uint64(n) + uint64(len(w.cid.Bytes())))
writeBuf := buf.Bytes()[varint.MaxLenUvarint63-len(sizeBytes):]
w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid):]
w.buf = buf.Bytes()[varint.MaxLenUvarint63+len(w.cid.Bytes()):]
_ = copy(writeBuf[:], sizeBytes)

size := len(writeBuf)
size := uint64(len(writeBuf))
if w.wo.toSkip > 0 {
if w.wo.toSkip >= uint64(len(writeBuf)) {
w.wo.toSkip -= uint64(len(writeBuf))
if w.wo.toSkip >= size {
w.wo.toSkip -= size
writeBuf = []byte{}
} else {
writeBuf = writeBuf[w.wo.toSkip:]
w.wo.toSkip = 0
}
}

if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.wo.w); err != nil {
return 0, err
}
_, c, err := cid.CidFromBytes([]byte(w.cid))
if err != nil {
return 0, err
}
w.wo.rcrds[c] = index.Record{
Cid: c,
Offset: w.wo.size,
// we haven't indexed this cid in this session
if _, ok := w.wo.rcrds[w.cid]; !ok {
if _, err := bytes.NewBuffer(writeBuf).WriteTo(w.wo.w); err != nil {
return 0, err
}

w.wo.rcrds[w.cid] = index.Record{
Cid: w.cid,
Offset: w.wo.size,
}
}
w.wo.size += uint64(size)

w.wo.size += size
w.wo = nil
}

Expand All @@ -125,7 +134,15 @@ func (w *writingReader) Read(p []byte) (int, error) {
// The `initialOffset` is used to calculate the offsets recorded for the index, and will be
// included in the `.Size()` of the IndexTracker.
// An indexCodec of `index.CarIndexNoIndex` can be used to not track these offsets.
func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, skip uint64, indexCodec multicodec.Code) (ipld.LinkSystem, IndexTracker) {
func TeeingLinkSystem(
ls ipld.LinkSystem,
w io.Writer,
pathState PathState,
initialOffset uint64,
skip uint64,
indexCodec multicodec.Code,
) (ipld.LinkSystem, IndexTracker) {

iw := indexingWriter{
w: w,
size: initialOffset,
Expand All @@ -141,17 +158,18 @@ func TeeingLinkSystem(ls ipld.LinkSystem, w io.Writer, initialOffset uint64, ski
return nil, err
}

// if we've already read this cid in this session, don't re-write it.
if _, ok := iw.rcrds[c]; ok {
return ls.StorageReadOpener(lc, l)
}

r, err := ls.StorageReadOpener(lc, l)
if err != nil {
return nil, err
}

return &writingReader{r, nil, l.Binary(), &iw}, nil
/*
offset, err := pathState.GetOffsetAfter(lc.LinkPath)
if err != nil {
//return nil, err
}
*/
return &writingReader{r, nil, c, &iw}, nil
}
return tls, &iw
}
2 changes: 2 additions & 0 deletions v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/multiformats/go-multicodec"

"github.com/ipld/go-car/v2/internal/carv1"
resumetraversal "github.com/ipld/go-car/v2/traversal"
)

// DefaultMaxIndexCidSize specifies the maximum size in byptes accepted as a section CID by CARv2 index.
Expand Down Expand Up @@ -62,6 +63,7 @@ type Options struct {
TraversalPrototypeChooser traversal.LinkTargetNodePrototypeChooser
DataPayloadSize uint64
SkipOffset uint64
TraversalResumerPathState resumetraversal.PathState

MaxAllowedHeaderSize uint64
MaxAllowedSectionSize uint64
Expand Down
119 changes: 61 additions & 58 deletions v2/selective.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,47 @@ func WithDataPayloadSize(size uint64) Option {
}
}

// WithTraversalResumerPathState provides a custom PathState that can be reused
// between selective CAR creations where traversals may need to be resumed at
// arbitrary points within the DAG.
//
// A PathState shared across multiple traversals using the same selector and DAG
// will yield the same state. This allows us to resume at arbitrary points
// within in the DAG and load the minimal additional blocks required to resume
// the traversal at that point.
func WithTraversalResumerPathState(pathState resumetraversal.PathState) Option {
return func(o *Options) {
o.TraversalResumerPathState = pathState
}
}

func newTraversalCar(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts Options) *traversalCar {
pathState := opts.TraversalResumerPathState
if pathState == nil {
pathState = resumetraversal.NewPathState()
}
return &traversalCar{
ctx: ctx,
opts: opts,
ls: ls,
root: root,
selector: selector,
size: opts.DataPayloadSize,
pathState: pathState,
}
}

// NewSelectiveWriter walks through the proposed dag traversal to learn its total size in order to be able to
// stream out a car to a writer in the expected traversal order in one go.
func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (Writer, error) {
conf := ApplyOptions(opts...)
tc := newTraversalCar(ctx, ls, root, selector, conf)

if conf.DataPayloadSize != 0 {
return &traversalCar{
size: conf.DataPayloadSize,
ctx: ctx,
root: root,
selector: selector,
ls: ls,
opts: ApplyOptions(opts...),
}, nil
}
tc := traversalCar{
//size: headSize + cntr.Size(),
ctx: ctx,
root: root,
selector: selector,
ls: ls,
opts: ApplyOptions(opts...),
}
if err := tc.setup(ctx, ls, ApplyOptions(opts...)); err != nil {
return tc, nil
}

if err := tc.setup(ctx, ls, conf.SkipOffset, ApplyOptions(opts...)); err != nil {
return nil, err
}

Expand All @@ -85,21 +103,15 @@ func NewSelectiveWriter(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid,
return nil, err
}
tc.size = headSize + tc.resumer.Position()
return &tc, nil

return tc, nil
}

// TraverseToFile writes a car file matching a given root and selector to the
// path at `destination` using one read of each block.
func TraverseToFile(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, destination string, opts ...Option) error {
conf := ApplyOptions(opts...)
tc := traversalCar{
size: conf.DataPayloadSize,
ctx: ctx,
root: root,
selector: selector,
ls: ls,
opts: conf,
}
tc := newTraversalCar(ctx, ls, root, selector, conf)

fp, err := os.Create(destination)
if err != nil {
Expand Down Expand Up @@ -129,15 +141,7 @@ func TraverseToFile(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, sele
func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, writer io.Writer, opts ...Option) (uint64, error) {
opts = append(opts, WithoutIndex())
conf := ApplyOptions(opts...)
tc := traversalCar{
size: conf.DataPayloadSize,
ctx: ctx,
root: root,
selector: selector,
ls: ls,
opts: conf,
}

tc := newTraversalCar(ctx, ls, root, selector, conf)
len, _, err := tc.WriteV1(tc.ctx, conf.SkipOffset, writer)
return len, err
}
Expand All @@ -146,14 +150,7 @@ func TraverseV1(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector
func NewCarV1StreamReader(ctx context.Context, ls *ipld.LinkSystem, root cid.Cid, selector ipld.Node, opts ...Option) (io.ReadSeeker, error) {
opts = append(opts, WithoutIndex())
conf := ApplyOptions(opts...)
tc := traversalCar{
size: conf.DataPayloadSize,
ctx: ctx,
root: root,
selector: selector,
ls: ls,
opts: conf,
}
tc := newTraversalCar(ctx, ls, root, selector, conf)
rwf := func(ctx context.Context, offset uint64, writer io.Writer) (uint64, error) {
s, _, err := tc.WriteV1(ctx, offset, writer)
return s, err
Expand All @@ -170,14 +167,16 @@ type Writer interface {
var _ Writer = (*traversalCar)(nil)

type traversalCar struct {
size uint64
ctx context.Context
root cid.Cid
selector ipld.Node
ls *ipld.LinkSystem
opts Options
progress *traversal.Progress
resumer resumetraversal.TraverseResumer
size uint64
ctx context.Context
root cid.Cid
selector ipld.Node
ls *ipld.LinkSystem
opts Options
progress *traversal.Progress
resumer resumetraversal.TraverseResumer
pathState resumetraversal.PathState
skip uint64
}

func (tc *traversalCar) WriteTo(w io.Writer) (int64, error) {
Expand Down Expand Up @@ -275,9 +274,9 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) (
skip -= v1Size
}

// write the block.
wls, writer := loader.TeeingLinkSystem(*tc.ls, w, v1Size, skip, tc.opts.IndexCodec)
if err = tc.setup(ctx, &wls, tc.opts); err != nil {
// write the block
wls, writer := loader.TeeingLinkSystem(*tc.ls, w, tc.pathState, v1Size, skip, tc.opts.IndexCodec)
if err = tc.setup(ctx, &wls, skip, tc.opts); err != nil {
return v1Size, nil, err
}
err = tc.traverse(tc.root, tc.selector)
Expand All @@ -297,7 +296,7 @@ func (tc *traversalCar) WriteV1(ctx context.Context, skip uint64, w io.Writer) (
return v1Size, idx, err
}

func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, opts Options) error {
func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, skip uint64, opts Options) error {
chooser := func(_ ipld.Link, _ linking.LinkContext) (ipld.NodePrototype, error) {
return basicnode.Prototype.Any, nil
}
Expand All @@ -321,12 +320,13 @@ func (tc *traversalCar) setup(ctx context.Context, ls *ipld.LinkSystem, opts Opt
}

ls.TrustedStorage = true
resumer, err := resumetraversal.WithTraversingLinksystem(&progress)
resumer, err := resumetraversal.WithTraversingLinksystem(&progress, tc.pathState)
if err != nil {
return err
}
tc.progress = &progress
tc.resumer = resumer
tc.skip = skip
return nil
}

Expand All @@ -342,7 +342,10 @@ func (tc *traversalCar) traverse(root cid.Cid, s ipld.Node) error {
}
rootNode, err := tc.progress.Cfg.LinkSystem.Load(ipld.LinkContext{}, lnk, rp)
if err != nil {
return fmt.Errorf("root blk load failed: %s", err)
return fmt.Errorf("root block load failed: %s", err)
}
if tc.skip > 0 {
tc.resumer.RewindToOffset(tc.skip)
}
err = tc.progress.WalkMatching(rootNode, sel, func(_ traversal.Progress, node ipld.Node) error {
if lbn, ok := node.(datamodel.LargeBytesNode); ok {
Expand Down