Skip to content

Commit

Permalink
fix: localstore tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alok Nerurkar committed Nov 23, 2022
1 parent ad0c8be commit 5abc357
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 100 deletions.
4 changes: 2 additions & 2 deletions pkg/localstore/localstore_test.go
Expand Up @@ -496,11 +496,11 @@ func newIndexGCSizeTest(db *DB) func(t *testing.T) {

// reserveSizeTest checks that the reserveSize scalar is equal
// to the expected value.
func reserveSizeTest(db *DB, want uint64) func(t *testing.T) {
func reserveSizeTest(db *DB, want uint64, depth uint8) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()

got, err := db.reserveSize.Get()
got, err := db.ComputeReserveSize(depth)
if err != nil {
t.Fatal(err)
}
Expand Down
164 changes: 74 additions & 90 deletions pkg/localstore/reserve_test.go
Expand Up @@ -44,11 +44,7 @@ func TestDB_ReserveGC_AllOutOfRadius(t *testing.T) {

for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(5, 3, 2, false)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
_, err := db.Put(context.Background(), storage.ModePutRequest, ch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -85,7 +81,7 @@ func TestDB_ReserveGC_AllOutOfRadius(t *testing.T) {

t.Run("gc size", newIndexGCSizeTest(db))

t.Run("reserve size", reserveSizeTest(db, 0))
t.Run("reserve size", reserveSizeTest(db, 0, 0))

// the first synced chunk should be removed
t.Run("get the first synced chunk", func(t *testing.T) {
Expand Down Expand Up @@ -163,11 +159,7 @@ func TestDB_ReserveGC_AllWithinRadius(t *testing.T) {

for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
_, err := db.Put(context.Background(), storage.ModePutSync, ch)
if err != nil {
t.Fatal(err)
}
Expand All @@ -193,7 +185,7 @@ func TestDB_ReserveGC_AllWithinRadius(t *testing.T) {

t.Run("gc size", newIndexGCSizeTest(db))

t.Run("reserve size", reserveSizeTest(db, 150))
t.Run("reserve size", reserveSizeTest(db, 150, 2))

t.Run("all chunks should be accessible", func(t *testing.T) {
for _, a := range addrs {
Expand Down Expand Up @@ -270,7 +262,7 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) {
Capacity: 100,
// once reaching 150 in the reserve, we will evict
// half the size of the cache from the reserve, so 50 chunks
ReserveCapacity: 100,
ReserveCapacity: 90,
UnreserveFunc: unres,
})
closed = db.close
Expand All @@ -280,11 +272,7 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) {
// the cache. gc of the cache is still not triggered
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
_, err := db.Put(context.Background(), storage.ModePutSync, ch)
if err != nil {
t.Fatal(err)
}
Expand All @@ -294,34 +282,30 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) {
mtx.Unlock()
}

// wait for the first eviction to finish, otherwise
// we collect some of the next chunks that get added
// which results in inconsistencies
evictTarget := db.reserveEvictionTarget()
t.Run("reserve size", reserveSizeTest(db, uint64(chunkCount), 2))

err := db.SetReserveSize(uint64(chunkCount))
if err != nil {
t.Fatal(err)
}

var evicted uint64
for {
select {
case <-testHookEvictChan:
case c := <-testHookEvictChan:
evicted += c
case <-time.After(10 * time.Second):
t.Fatal("collect garbage timeout")
}
resSize, err := db.reserveSize.Get()
if err != nil {
t.Fatal(err)
}
if resSize == evictTarget {
if evicted == 10 {
break
}
}

// insert another 90, this will trigger gc
for i := 0; i < 90; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
_, err := db.Put(context.Background(), storage.ModePutSync, ch)
if err != nil {
t.Fatal(err)
}
Expand All @@ -331,17 +315,22 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) {
mtx.Unlock()
}

t.Run("reserve size", reserveSizeTest(db, 180, 2))

err = db.SetReserveSize(uint64(180))
if err != nil {
t.Fatal(err)
}

evicted = 0
for {
select {
case <-testHookEvictChan:
case c := <-testHookEvictChan:
evicted += c
case <-time.After(10 * time.Second):
t.Fatal("collect garbage timeout")
}
resSize, err := db.reserveSize.Get()
if err != nil {
t.Fatal(err)
}
if resSize == evictTarget {
if evicted == 90 {
break
}
}
Expand Down Expand Up @@ -375,7 +364,7 @@ func TestDB_ReserveGC_Unreserve(t *testing.T) {

t.Run("gc size", newIndexGCSizeTest(db))

t.Run("reserve size", reserveSizeTest(db, 90))
t.Run("reserve size", reserveSizeTest(db, 90, 2))

t.Run("first ten unreserved chunks should not be accessible", func(t *testing.T) {
for _, a := range addrs[:10] {
Expand Down Expand Up @@ -462,7 +451,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) {
Capacity: 100,
// once reaching 100 in the reserve, we will evict
// half the size of the cache from the reserve, so 50 chunks
ReserveCapacity: 100,
ReserveCapacity: 90,
UnreserveFunc: unres,
})

Expand All @@ -471,11 +460,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) {
// put the first chunkCount chunks within radius
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
_, err := db.Put(context.Background(), storage.ModePutSync, ch)
if err != nil {
t.Fatal(err)
}
Expand All @@ -485,22 +470,22 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) {
mtx.Unlock()
}

// wait for the first eviction to finish, otherwise
// we collect some of the next chunks that get added
// which results in inconsistencies
evictTarget := db.reserveEvictionTarget()
t.Run("reserve size", reserveSizeTest(db, 100, 2))

err := db.SetReserveSize(uint64(100))
if err != nil {
t.Fatal(err)
}

var evicted uint64
for {
select {
case <-testHookEvictChan:
case c := <-testHookEvictChan:
evicted += c
case <-time.After(10 * time.Second):
t.Fatal("collect garbage timeout")
}
resSize, err := db.reserveSize.Get()
if err != nil {
t.Fatal(err)
}
if resSize == evictTarget {
if evicted == 10 {
break
}
}
Expand All @@ -516,11 +501,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) {

for i := 0; i < 90; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
_, err := db.Put(context.Background(), storage.ModePutSync, ch)
if err != nil {
t.Fatal(err)
}
Expand All @@ -529,17 +510,23 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) {
addrs = append(addrs, ch.Address())
mtx.Unlock()
}

t.Run("reserve size", reserveSizeTest(db, 180, 2))

err = db.SetReserveSize(uint64(180))
if err != nil {
t.Fatal(err)
}

evicted = 0
for {
select {
case <-testHookEvictChan:
case c := <-testHookEvictChan:
evicted += c
case <-time.After(10 * time.Second):
t.Fatal("collect garbage timeout")
}
resSize, err := db.reserveSize.Get()
if err != nil {
t.Fatal(err)
}
if resSize == evictTarget {
if evicted == 90 {
break
}
}
Expand Down Expand Up @@ -570,7 +557,7 @@ func TestDB_ReserveGC_EvictMaxPO(t *testing.T) {

t.Run("gc size", newIndexGCSizeTest(db))

t.Run("reserve size", reserveSizeTest(db, 90))
t.Run("reserve size", reserveSizeTest(db, 90, 0))

t.Run("first ten unreserved chunks should not be accessible", func(t *testing.T) {
for _, a := range addrs[:10] {
Expand Down Expand Up @@ -612,7 +599,7 @@ func TestReserveSize(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Run("reserve size", reserveSizeTest(db, 10))
t.Run("reserve size", reserveSizeTest(db, 10, 0))
})

t.Run("variadic put upload then set sync", func(t *testing.T) {
Expand All @@ -633,13 +620,13 @@ func TestReserveSize(t *testing.T) {
if err != nil {
t.Fatal(err)
}
t.Run("reserve size", reserveSizeTest(db, 0))
t.Run("reserve size", reserveSizeTest(db, 0, 0))

err = db.Set(context.Background(), storage.ModeSetSync, addrs...)
if err != nil {
t.Fatal(err)
}
t.Run("reserve size", reserveSizeTest(db, 10))
t.Run("reserve size", reserveSizeTest(db, 0, 0))
})

t.Run("sequencial put sync", func(t *testing.T) {
Expand All @@ -656,10 +643,11 @@ func TestReserveSize(t *testing.T) {
t.Fatal(err)
}
}
t.Run("reserve size", reserveSizeTest(db, 10))
t.Run("reserve size", reserveSizeTest(db, 10, 0))
})

t.Run("sequencial put upload then set sync", func(t *testing.T) {
t.Run("sequencial put request", func(t *testing.T) {
t.Cleanup(setWithinRadiusFunc(func(*DB, shed.Item) bool { return true }))
var (
db = newTestDB(t, &Options{
Capacity: 100,
Expand All @@ -670,19 +658,12 @@ func TestReserveSize(t *testing.T) {
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
chs = append(chs, ch)
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
}
t.Run("reserve size", reserveSizeTest(db, 0))
for _, ch := range chs {
err := db.Set(context.Background(), storage.ModeSetSync, ch.Address())
_, err := db.Put(context.Background(), storage.ModePutRequest, ch)
if err != nil {
t.Fatal(err)
}
}
t.Run("reserve size", reserveSizeTest(db, 10))
t.Run("reserve size", reserveSizeTest(db, 10, 0))
})
}

Expand Down Expand Up @@ -710,7 +691,7 @@ func TestComputeReserveSize(t *testing.T) {
t.Fatal(err)
}

t.Run("reserve size", reserveSizeTest(db, chunkCountPerPO*maxPO))
t.Run("reserve size", reserveSizeTest(db, chunkCountPerPO*maxPO, 0))

for po := 0; po < maxPO; po++ {
got, err := db.ComputeReserveSize(uint8(po))
Expand Down Expand Up @@ -763,7 +744,7 @@ func TestDB_ReserveGC_BatchedUnreserve(t *testing.T) {

db := newTestDB(t, &Options{
Capacity: 100,
ReserveCapacity: 100,
ReserveCapacity: 50,
UnreserveFunc: unres,
})
closed = db.close
Expand All @@ -777,16 +758,19 @@ func TestDB_ReserveGC_BatchedUnreserve(t *testing.T) {

for i := 0; i < chunkCount; i++ {
ch := genChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSync, ch.Address())
_, err := db.Put(context.Background(), storage.ModePutSync, ch)
if err != nil {
t.Fatal(err)
}
}

t.Run("reserve size", reserveSizeTest(db, 0, 100))

err := db.SetReserveSize(100)
if err != nil {
t.Fatal(err)
}

select {
case <-testHookEvictChan:
case <-time.After(10 * time.Second):
Expand All @@ -798,7 +782,7 @@ func TestDB_ReserveGC_BatchedUnreserve(t *testing.T) {
t.Fatal("gc timeout")
}

t.Run("reserve size", reserveSizeTest(db, 0))
t.Run("reserve size", reserveSizeTest(db, 0, 0))

t.Run("pull index count", newItemsCountTest(db.pullIndex, 0))

Expand Down

0 comments on commit 5abc357

Please sign in to comment.