diff --git a/pkg/localstore/localstore_test.go b/pkg/localstore/localstore_test.go index 21396d4a688..5ec7d3243aa 100644 --- a/pkg/localstore/localstore_test.go +++ b/pkg/localstore/localstore_test.go @@ -496,11 +496,11 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) { // reserveSizeTest checks that the reserveSize scalar is equal // to the expected value. -func reserveSizeTest(db *DB, want uint64) func(t *testing.T) { +func reserveSizeTest(db *DB, want uint64, depth uint8) func(t *testing.T) { return func(t *testing.T) { t.Helper() - got, err := db.reserveSize.Get() + got, err := db.ComputeReserveSize(depth) if err != nil { t.Fatal(err) } diff --git a/pkg/localstore/reserve_test.go b/pkg/localstore/reserve_test.go index cbc402b01c1..4b04c2542fe 100644 --- a/pkg/localstore/reserve_test.go +++ b/pkg/localstore/reserve_test.go @@ -44,11 +44,7 @@ func TestDB_ReserveGC_AllOutOfRadius(t *testing.T) { for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(5, 3, 2, false) - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + _, err := db.Put(context.Background(), storage.ModePutRequest, ch) if err != nil { t.Fatal(err) } @@ -85,7 +81,7 @@ func TestDB_ReserveGC_AllOutOfRadius(t *testing.T) { t.Run("gc size", newIndexGCSizeTest(db)) - t.Run("reserve size", reserveSizeTest(db, 0)) + t.Run("reserve size", reserveSizeTest(db, 0, 0)) // the first synced chunk should be removed t.Run("get the first synced chunk", func(t *testing.T) { @@ -163,11 +159,7 @@ func TestDB_ReserveGC_AllWithinRadius(t *testing.T) { for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false) - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -193,7 +185,7 @@ func TestDB_ReserveGC_AllWithinRadius(t *testing.T) { t.Run("gc size", newIndexGCSizeTest(db)) - t.Run("reserve size", reserveSizeTest(db, 150)) + t.Run("reserve size", reserveSizeTest(db, 150, 2)) t.Run("all chunks should be accessible", func(t *testing.T) { for _, a := range addrs { @@ -270,7 +262,7 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) { Capacity: 100, // once reaching 150 in the reserve, we will evict // half the size of the cache from the reserve, so 50 chunks - ReserveCapacity: 100, + ReserveCapacity: 90, UnreserveFunc: unres, }) closed = db.close @@ -280,11 +272,7 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) { // the cache. gc of the cache is still not triggered for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false) - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -294,22 +282,22 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) { mtx.Unlock() } - // wait for the first eviction to finish, otherwise - // we collect some of the next chunks that get added - // which results in inconsistencies - evictTarget := db.reserveEvictionTarget() + t.Run("reserve size", reserveSizeTest(db, uint64(chunkCount), 2)) + + err := db.SetReserveSize(uint64(chunkCount)) + if err != nil { + t.Fatal(err) + } + var evicted uint64 for { select { - case <-testHookEvictChan: + case c := <-testHookEvictChan: + evicted += c case <-time.After(10 * time.Second): t.Fatal("collect garbage timeout") } - resSize, err := db.reserveSize.Get() - if err != nil { - t.Fatal(err) - } - if resSize == evictTarget { + if evicted == 10 { break } } @@ -317,11 +305,7 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) { // insert another 90, this will trigger gc for i := 0; i < 90; i++ { ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false) - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -331,17 +315,22 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) { mtx.Unlock() } + t.Run("reserve size", reserveSizeTest(db, 180, 2)) + + err = db.SetReserveSize(uint64(180)) + if err != nil { + t.Fatal(err) + } + + evicted = 0 for { select { - case <-testHookEvictChan: + case c := <-testHookEvictChan: + evicted += c case <-time.After(10 * time.Second): t.Fatal("collect garbage timeout") } - resSize, err := db.reserveSize.Get() - if err != nil { - t.Fatal(err) - } - if resSize == evictTarget { + if evicted == 90 { break } } @@ -375,7 +364,7 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) { t.Run("gc size", newIndexGCSizeTest(db)) - t.Run("reserve size", reserveSizeTest(db, 90)) + t.Run("reserve size", reserveSizeTest(db, 90, 2)) t.Run("first ten unreserved chunks should not be accessible", func(t *testing.T) { for _, a := range addrs[:10] { @@ -462,7 +451,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) { Capacity: 100, // once reaching 100 in the reserve, we will evict // half the size of the cache from the reserve, so 50 chunks - ReserveCapacity: 100, + ReserveCapacity: 90, UnreserveFunc: unres, }) @@ -471,11 +460,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) { // put the first chunkCount chunks within radius for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false) - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -485,22 +470,22 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) { mtx.Unlock() } - // wait for the first eviction to finish, otherwise - // we collect some of the next chunks that get added - // which results in inconsistencies - evictTarget := db.reserveEvictionTarget() + t.Run("reserve size", reserveSizeTest(db, 100, 2)) + err := db.SetReserveSize(uint64(100)) + if err != nil { + t.Fatal(err) + } + + var evicted uint64 for { select { - case <-testHookEvictChan: + case c := <-testHookEvictChan: + evicted += c case <-time.After(10 * time.Second): t.Fatal("collect garbage timeout") } - resSize, err := db.reserveSize.Get() - if err != nil { - t.Fatal(err) - } - if resSize == evictTarget { + if evicted == 10 { break } } @@ -516,11 +501,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) { for i := 0; i < 90; i++ { ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false) - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -529,17 +510,23 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) { addrs = append(addrs, ch.Address()) mtx.Unlock() } + + t.Run("reserve size", reserveSizeTest(db, 180, 2)) + + err = db.SetReserveSize(uint64(180)) + if err != nil { + t.Fatal(err) + } + + evicted = 0 for { select { - case <-testHookEvictChan: + case c := <-testHookEvictChan: + evicted += c case <-time.After(10 * time.Second): t.Fatal("collect garbage timeout") } - resSize, err := db.reserveSize.Get() - if err != nil { - t.Fatal(err) - } - if resSize == evictTarget { + if evicted == 90 { break } } @@ -570,7 +557,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) { t.Run("gc size", newIndexGCSizeTest(db)) - t.Run("reserve size", reserveSizeTest(db, 90)) + t.Run("reserve size", reserveSizeTest(db, 90, 0)) t.Run("first ten unreserved chunks should not be accessible", func(t *testing.T) { for _, a := range addrs[:10] { @@ -612,7 +599,7 @@ func TestReserveSize(t *testing.T) { if err != nil { t.Fatal(err) } - t.Run("reserve size", reserveSizeTest(db, 10)) + t.Run("reserve size", reserveSizeTest(db, 10, 0)) }) t.Run("variadic put upload then set sync", func(t *testing.T) { @@ -633,13 +620,13 @@ func TestReserveSize(t *testing.T) { if err != nil { t.Fatal(err) } - t.Run("reserve size", reserveSizeTest(db, 0)) + t.Run("reserve size", reserveSizeTest(db, 0, 0)) err = db.Set(context.Background(), storage.ModeSetSync, addrs...) if err != nil { t.Fatal(err) } - t.Run("reserve size", reserveSizeTest(db, 10)) + t.Run("reserve size", reserveSizeTest(db, 0, 0)) }) t.Run("sequencial put sync", func(t *testing.T) { @@ -656,10 +643,11 @@ func TestReserveSize(t *testing.T) { t.Fatal(err) } } - t.Run("reserve size", reserveSizeTest(db, 10)) + t.Run("reserve size", reserveSizeTest(db, 10, 0)) }) - t.Run("sequencial put upload then set sync", func(t *testing.T) { + t.Run("sequencial put request", func(t *testing.T) { + t.Cleanup(setWithinRadiusFunc(func(*DB, shed.Item) bool { return true })) var ( db = newTestDB(t, &Options{ Capacity: 100, @@ -670,19 +658,12 @@ func TestReserveSize(t *testing.T) { for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false) chs = append(chs, ch) - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } - } - t.Run("reserve size", reserveSizeTest(db, 0)) - for _, ch := range chs { - err := db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + _, err := db.Put(context.Background(), storage.ModePutRequest, ch) if err != nil { t.Fatal(err) } } - t.Run("reserve size", reserveSizeTest(db, 10)) + t.Run("reserve size", reserveSizeTest(db, 10, 0)) }) } @@ -710,7 +691,7 @@ func TestComputeReserveSize(t *testing.T) { t.Fatal(err) } - t.Run("reserve size", reserveSizeTest(db, chunkCountPerPO*maxPO)) + t.Run("reserve size", reserveSizeTest(db, chunkCountPerPO*maxPO, 0)) for po := 0; po < maxPO; po++ { got, err := db.ComputeReserveSize(uint8(po)) @@ -763,7 +744,7 @@ func TestDB_ReserveGC_BatchedUnreserve(t *testing.T) { db := newTestDB(t, &Options{ Capacity: 100, - ReserveCapacity: 100, + ReserveCapacity: 50, UnreserveFunc: unres, }) closed = db.close @@ -777,16 +758,19 @@ func TestDB_ReserveGC_BatchedUnreserve(t *testing.T) { for i := 0; i < chunkCount; i++ { ch := genChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) - if err != nil { - t.Fatal(err) - } - err = db.Set(context.Background(), storage.ModeSetSync, ch.Address()) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } } + t.Run("reserve size", reserveSizeTest(db, 0, 100)) + + err := db.SetReserveSize(100) + if err != nil { + t.Fatal(err) + } + select { case <-testHookEvictChan: case <-time.After(10 * time.Second): @@ -798,7 +782,7 @@ func TestDB_ReserveGC_BatchedUnreserve(t *testing.T) { t.Fatal("gc timeout") } - t.Run("reserve size", reserveSizeTest(db, 0)) + t.Run("reserve size", reserveSizeTest(db, 0, 0)) t.Run("pull index count", newItemsCountTest(db.pullIndex, 0)) diff --git a/pkg/localstore/sampler_test.go b/pkg/localstore/sampler_test.go index d090f05ada0..14fe07a0755 100644 --- a/pkg/localstore/sampler_test.go +++ b/pkg/localstore/sampler_test.go @@ -43,7 +43,7 @@ func TestReserveSampler(t *testing.T) { t.Fatal(err) } - t.Run("reserve size", reserveSizeTest(db, chunkCountPerPO*maxPO)) + t.Run("reserve size", reserveSizeTest(db, chunkCountPerPO*maxPO, 0)) var sample1 storage.Sample @@ -80,7 +80,7 @@ func TestReserveSampler(t *testing.T) { t.Fatal(err) } - t.Run("reserve size", reserveSizeTest(db, 2*chunkCountPerPO*maxPO)) + t.Run("reserve size", reserveSizeTest(db, 2*chunkCountPerPO*maxPO, 0)) // Now we generate another sample with the older timestamp. This should give us // the exact same sample, ensuring that none of the later chunks were considered. diff --git a/pkg/localstore/subscription_pull_test.go b/pkg/localstore/subscription_pull_test.go index e2428f11963..c682458125e 100644 --- a/pkg/localstore/subscription_pull_test.go +++ b/pkg/localstore/subscription_pull_test.go @@ -192,7 +192,7 @@ func TestDB_SubscribePull_since(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -270,7 +270,7 @@ func TestDB_SubscribePull_until(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -348,7 +348,7 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -439,7 +439,7 @@ func TestDB_SubscribePull_rangeOnRemovedChunks(t *testing.T) { for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -529,7 +529,7 @@ func uploadRandomChunksBin(t *testing.T, db *DB, addrs map[uint8][]swarm.Address for i := 0; i < count; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -627,7 +627,7 @@ func TestDB_LastPullSubscriptionBinID(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + _, err := db.Put(context.Background(), storage.ModePutSync, ch) if err != nil { t.Fatal(err) }