Skip to content

Commit

Permalink
ReadWrite: faster Has() by using the in-memory index instead of readi…
Browse files Browse the repository at this point in the history
…ng on disk

Before:
// BenchmarkHas-8   	  190216	      6368 ns/op	     744 B/op	      16 allocs/op

After
//  BenchmarkHas-8   	 1419169	       845.6 ns/op	     320 B/op	       6 allocs/op

```
func BenchmarkHas(b *testing.B) {
	ctx := context.TODO()

	path := filepath.Join(b.TempDir(), "bench-large-v2.car")
	generateRandomCarV2File(b, path, 200<<20) // 10 MiB
	defer os.Remove(path)

	subject, err := blockstore.OpenReadWrite(path, nil)

	c, err := subject.AllKeysChan(ctx)
	require.NoError(b, err)

	var allCids []cid.Cid

	for c2 := range c {
		allCids = append(allCids, c2)
	}

	b.ReportAllocs()
	b.ResetTimer()

	var idx int
	for i := 0; i < b.N; i++ {
		_, _ = subject.Has(ctx, allCids[idx])
		// require.NoError(b, err)
		// require.True(b, has)
		idx = (idx + 1) % len(allCids)
	}
}
```
  • Loading branch information
MichaelMure authored and rvagg committed Mar 16, 2023
1 parent 649ff2a commit d51b4a1
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 9 deletions.
24 changes: 23 additions & 1 deletion v2/blockstore/readwrite.go
Expand Up @@ -344,7 +344,29 @@ func (b *ReadWrite) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
}

func (b *ReadWrite) Has(ctx context.Context, key cid.Cid) (bool, error) {
return b.ronly.Has(ctx, key)
if !b.opts.StoreIdentityCIDs {
// If we don't store identity CIDs then we can return them straight away as if they are here,
// otherwise we need to check for their existence.
// Note, we do this without locking, since there is no shared information to lock for in order to perform the check.
if _, ok, err := store.IsIdentity(key); err != nil {
return false, err
} else if ok {
return true, nil
}
}

if ctx.Err() != nil {
return false, ctx.Err()
}

b.ronly.mu.Lock()
defer b.ronly.mu.Unlock()

if b.ronly.closed {
return false, errClosed
}

return b.idx.HasMultihash(key.Hash())
}

func (b *ReadWrite) Get(ctx context.Context, key cid.Cid) (blocks.Block, error) {
Expand Down
6 changes: 3 additions & 3 deletions v2/index/index.go
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/multiformats/go-varint"
)

// CarIndexNone is a sentinal value used as a multicodec code for the index indicating no index.
// CarIndexNone is a sentinel value used as a multicodec code for the index indicating no index.
const CarIndexNone = 0x300000

type (
Expand Down Expand Up @@ -46,7 +46,7 @@ type (
// Unmarshal decodes the index from its serial form.
// Note, this function will copy the entire index into memory.
//
// Do not unmarshal index from untrusted CARv2 files. Instead the index should be
// Do not unmarshal index from untrusted CARv2 files. Instead, the index should be
// regenerated from the CARv2 data payload.
Unmarshal(r io.Reader) error

Expand Down Expand Up @@ -84,7 +84,7 @@ type (
// and the ForEach function returns the error to the user.
//
// An index may contain multiple offsets corresponding to the same multihash, e.g. via duplicate blocks.
// In such cases, the given function may be called multiple times with the same multhihash but different offset.
// In such cases, the given function may be called multiple times with the same multihash but different offset.
//
// The order of calls to the given function is deterministic, but entirely index-specific.
ForEach(func(multihash.Multihash, uint64) error) error
Expand Down
31 changes: 28 additions & 3 deletions v2/internal/store/insertionindex.go
Expand Up @@ -212,10 +212,10 @@ func (ii *InsertionIndex) Flatten(codec multicodec.Code) (index.Index, error) {
// but it's separate as it allows us to compare Record.Cid directly,
// whereas GetAll just provides Record.Offset.

func (ii *InsertionIndex) HasExactCID(c cid.Cid) bool {
func (ii *InsertionIndex) HasExactCID(c cid.Cid) (bool, error) {
d, err := multihash.Decode(c.Hash())
if err != nil {
panic(err)
return false, err
}
entry := recordDigest{digest: d.Digest}

Expand All @@ -235,5 +235,30 @@ func (ii *InsertionIndex) HasExactCID(c cid.Cid) bool {
return true
}
ii.items.AscendGreaterOrEqual(entry, iter)
return found
return found, nil
}

func (ii *InsertionIndex) HasMultihash(mh multihash.Multihash) (bool, error) {
d, err := multihash.Decode(mh)
if err != nil {
return false, err
}
entry := recordDigest{digest: d.Digest}

found := false
iter := func(i llrb.Item) bool {
existing := i.(recordDigest)
if !bytes.Equal(existing.digest, entry.digest) {
// We've already looked at all entries with matching digests.
return false
}
if bytes.Equal(existing.Record.Cid.Hash(), mh) {
found = true
return false
}
// Continue looking in ascending order.
return true
}
ii.items.AscendGreaterOrEqual(entry, iter)
return found, nil
}
8 changes: 6 additions & 2 deletions v2/internal/store/put.go
Expand Up @@ -38,8 +38,12 @@ func ShouldPut(
}

if !blockstoreAllowDuplicatePuts {
if blockstoreUseWholeCIDs && idx.HasExactCID(c) {
return false, nil // deduplicated by CID
if blockstoreUseWholeCIDs {
has, err := idx.HasExactCID(c)
if err != nil {
return false, err
}
return !has, nil // deduplicated by CID
}
if !blockstoreUseWholeCIDs {
_, err := idx.Get(c)
Expand Down

0 comments on commit d51b4a1

Please sign in to comment.