Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(localstore): remove pullIndex on caching #3447

Merged
merged 1 commit into from
Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions pkg/localstore/gc_test.go
Expand Up @@ -119,7 +119,7 @@ func testDBCollectGarbageWorker(t *testing.T) {
}
}

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))

t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)))

Expand Down Expand Up @@ -231,7 +231,7 @@ func TestPinGC(t *testing.T) {

t.Run("pin Index count", newItemsCountTest(db.pinIndex, pinChunksCount))

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)+pinChunksCount))
t.Run("pull index count", newItemsCountTest(db.pullIndex, pinChunksCount))

t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)+pinChunksCount))

Expand Down Expand Up @@ -441,7 +441,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
t.Errorf("total collected chunks %v, want %v", totalCollectedCount, wantTotalCollectedCount)
}

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))

t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)))

Expand Down Expand Up @@ -910,7 +910,7 @@ func TestGC_NoEvictDirty(t *testing.T) {
}
}

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))

t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, int(gcTarget)))

Expand Down Expand Up @@ -1088,7 +1088,7 @@ func TestReserveEvictionWorker(t *testing.T) {
break
}
}
t.Run("pull index count", newItemsCountTest(db.pullIndex, chunkCount))
t.Run("pull index count", newItemsCountTest(db.pullIndex, chunkCount-1))

t.Run("postage index count", newItemsCountTest(db.postageIndexIndex, chunkCount))

Expand Down
4 changes: 4 additions & 0 deletions pkg/localstore/mode_put.go
Expand Up @@ -547,6 +547,10 @@ func (db *DB) preserveOrCache(batch *leveldb.Batch, item shed.Item, forcePin, fo
if err != nil {
return 0, 0, err
}
err = db.pullIndex.DeleteInBatch(batch, item)
if err != nil {
return 0, 0, err
}
gcSizeChange++

return gcSizeChange, 0, nil
Expand Down
6 changes: 4 additions & 2 deletions pkg/localstore/mode_put_test.go
Expand Up @@ -215,7 +215,7 @@ func TestModePutSync(t *testing.T) {
binIDs[po]++

newRetrieveIndexesTestWithAccess(db, ch, wantTimestamp, wantTimestamp)(t)
newPullIndexTest(db, ch, binIDs[po], nil)(t)
newPullIndexTest(db, ch, binIDs[po], leveldb.ErrNotFound)(t)
newPinIndexTest(db, ch, leveldb.ErrNotFound)(t)
newIndexGCSizeTest(db)(t)
}
Expand Down Expand Up @@ -643,6 +643,8 @@ func TestModePut_ImmutableStamp(t *testing.T) {
stamp := postagetesting.MustNewStamp()
ts := time.Now().Unix()

t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return false }))

for _, modeTc1 := range putModes {
for _, modeTc2 := range putModes {
for _, tc := range []struct {
Expand Down Expand Up @@ -685,7 +687,7 @@ func TestModePut_ImmutableStamp(t *testing.T) {
newItemsCountTest(db.postageChunksIndex, 1)(t)
newItemsCountTest(db.postageRadiusIndex, 1)(t)
newItemsCountTest(db.postageIndexIndex, 1)(t)
if modeTc1 != storage.ModePutRequestCache {
if modeTc1 != storage.ModePutRequestCache && modeTc1 != storage.ModePutSync {
newItemsCountTest(db.pullIndex, 1)(t)
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/localstore/mode_set.go
Expand Up @@ -401,6 +401,10 @@ func (db *DB) setUnpin(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange i
if err != nil {
return 0, err
}
err = db.pullIndex.DeleteInBatch(batch, item)
if err != nil {
return 0, err
}

gcSizeChange++
return gcSizeChange, nil
Expand Down
12 changes: 6 additions & 6 deletions pkg/localstore/pin_test.go
Expand Up @@ -101,31 +101,31 @@ func TestPinIndexes(t *testing.T) {
if err != nil {
t.Fatal(err)
}
runCountsTest(t, "setSync", db, 1, 1, 0, 1, 0, 1)
runCountsTest(t, "setSync", db, 1, 1, 0, 0, 0, 1)

err = db.Set(ctx, storage.ModeSetPin, addr)
if err != nil {
t.Fatal(err)
}
runCountsTest(t, "setPin", db, 1, 1, 0, 1, 1, 0)
runCountsTest(t, "setPin", db, 1, 1, 0, 0, 1, 0)

err = db.Set(ctx, storage.ModeSetPin, addr)
if err != nil {
t.Fatal(err)
}
runCountsTest(t, "setPin 2", db, 1, 1, 0, 1, 1, 0)
runCountsTest(t, "setPin 2", db, 1, 1, 0, 0, 1, 0)

err = db.Set(ctx, storage.ModeSetUnpin, addr)
if err != nil {
t.Fatal(err)
}
runCountsTest(t, "setUnPin", db, 1, 1, 0, 1, 1, 0)
runCountsTest(t, "setUnPin", db, 1, 1, 0, 0, 1, 0)

err = db.Set(ctx, storage.ModeSetUnpin, addr)
if err != nil {
t.Fatal(err)
}
runCountsTest(t, "setUnPin 2", db, 1, 1, 0, 1, 0, 1)
runCountsTest(t, "setUnPin 2", db, 1, 1, 0, 0, 0, 1)

}

Expand Down Expand Up @@ -190,7 +190,7 @@ func TestPinIndexesSync(t *testing.T) {
if err != nil {
t.Fatal(err)
}
runCountsTest(t, "setUnPin", db, 1, 1, 0, 1, 0, 1)
runCountsTest(t, "setUnPin", db, 1, 1, 0, 0, 0, 1)
}

func runCountsTest(t *testing.T, name string, db *DB, r, a, push, pull, pin, gc int) {
Expand Down
10 changes: 5 additions & 5 deletions pkg/localstore/reserve_test.go
Expand Up @@ -73,7 +73,7 @@ func TestDB_ReserveGC_AllOutOfRadius(t *testing.T) {
}
}

t.Run("pull index count", newItemsCountTest(db.pullIndex, int(gcTarget)))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))

t.Run("postage chunks index count", newItemsCountTest(db.postageChunksIndex, int(gcTarget)))

Expand Down Expand Up @@ -362,7 +362,8 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) {
break
}
}
t.Run("pull index count", newItemsCountTest(db.pullIndex, chunkCount+90-10))
// pullIndex count should be equal to the reserve
t.Run("pull index count", newItemsCountTest(db.pullIndex, 90))

t.Run("postage chunks index count", newItemsCountTest(db.postageChunksIndex, chunkCount+90-10))

Expand Down Expand Up @@ -559,7 +560,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) {
break
}
}
t.Run("pull index count", newItemsCountTest(db.pullIndex, chunkCount+90-10))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 90))

t.Run("postage chunks index count", newItemsCountTest(db.postageChunksIndex, chunkCount+90-10))

Expand Down Expand Up @@ -799,8 +800,7 @@ func TestDB_ReserveGC_BatchedUnreserve(t *testing.T) {

t.Run("reserve size", reserveSizeTest(db, 0))

// chunks are still part of cache so these indexes would not be removed
t.Run("pull index count", newItemsCountTest(db.pullIndex, 90))
t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))

t.Run("postage chunks index count", newItemsCountTest(db.postageChunksIndex, 90))

Expand Down