Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use withinRadiusFn for the tests #3629

Merged
merged 2 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 17 additions & 10 deletions pkg/localstore/mode_put.go
Expand Up @@ -139,6 +139,13 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
return false, gcChangeNew + gcChange, err
}

// if access index is present, fill it as it is required for GC operations
accessIdx, err := db.retrievalAccessIndex.Get(storedItem)
if err != nil && !errors.Is(err, leveldb.ErrNotFound) {
return false, 0, err
}
storedItem.AccessTimestamp = accessIdx.AccessTimestamp

gcChange, err := putOp(storedItem, true)
if err != nil {
return false, 0, err
Expand All @@ -163,6 +170,9 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)

switch mode {
case storage.ModePutRequest, storage.ModePutRequestPin, storage.ModePutRequestCache:
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

for i, ch := range chs {
pin := mode == storage.ModePutRequestPin // force pin in this mode
cache := mode == storage.ModePutRequestCache // force cache
Expand All @@ -176,10 +186,10 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
gcSizeChange += c
}

db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

case storage.ModePutUpload, storage.ModePutUploadPin:
db.lock.Lock(lockKeyUpload)
defer db.lock.Unlock(lockKeyUpload)

for i, ch := range chs {
pin := mode == storage.ModePutUploadPin
exists, c, err := putChunk(ch, i, func(item shed.Item, exists bool) (int64, error) {
Expand All @@ -197,10 +207,10 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
gcSizeChange += c
}

db.lock.Lock(lockKeyUpload)
defer db.lock.Unlock(lockKeyUpload)

case storage.ModePutSync:
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

for i, ch := range chs {
exists, c, err := putChunk(ch, i, func(item shed.Item, exists bool) (int64, error) {
return db.putSync(batch, binIDs, item, exists)
Expand All @@ -217,9 +227,6 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
gcSizeChange += c
}

db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

default:
return nil, ErrInvalidMode
}
Expand Down Expand Up @@ -461,7 +468,7 @@ func (db *DB) putSync(

// if we try to add a new item at a lesser radius than the last known eviction
// radius of the batch, we should not add the chunk to reserve, but to cache
if !withinRadius(db, item) {
if !withinRadiusFn(db, item) {
return db.addToCache(batch, item)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/localstore/mode_put_test.go
Expand Up @@ -187,7 +187,7 @@ func TestModePutRequestCache(t *testing.T) {

// TestModePutSync validates ModePutSync index values on the provided DB.
func TestModePutSync(t *testing.T) {
t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return false }))
t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return true }))
for _, tc := range multiChunkTestCases {
t.Run(tc.name, func(t *testing.T) {
db := newTestDB(t, nil)
Expand Down
14 changes: 6 additions & 8 deletions pkg/localstore/mode_set.go
Expand Up @@ -68,18 +68,20 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
switch mode {

case storage.ModeSetSync:
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

for _, addr := range addrs {
c, err := db.setSync(batch, addr)
if err != nil {
return err
}
gcSizeChange += c
}

case storage.ModeSetRemove:
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

case storage.ModeSetRemove:
for _, addr := range addrs {
item := addressToItem(addr)
storedItem, err := db.retrievalDataIndex.Get(item)
Expand All @@ -97,11 +99,10 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
committedLocations = append(committedLocations, l)
gcSizeChange += c
}

case storage.ModeSetPin:
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

case storage.ModeSetPin:
for _, addr := range addrs {
item := addressToItem(addr)
c, err := db.setPin(batch, item)
Expand All @@ -110,20 +111,17 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
}
gcSizeChange += c
}
case storage.ModeSetUnpin:
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

case storage.ModeSetUnpin:
for _, addr := range addrs {
c, err := db.setUnpin(batch, addr)
if err != nil {
return err
}
gcSizeChange += c
}
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

default:
return ErrInvalidMode
}
Expand Down
37 changes: 36 additions & 1 deletion pkg/localstore/pin_test.go
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/syndtr/goleveldb/leveldb"
)

func TestPinCounter(t *testing.T) {
Expand Down Expand Up @@ -195,7 +196,7 @@ func TestPinIndexesSync(t *testing.T) {

func TestPinIndexesPutSync(t *testing.T) {
ctx := context.Background()
t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return false }))
t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return true }))

db := newTestDB(t, &Options{
Capacity: 150,
Expand Down Expand Up @@ -240,6 +241,40 @@ func TestPinIndexesPutSync(t *testing.T) {
runCountsTest(t, "setUnPin 2", db, 1, 1, 0, 0, 0, 1)
}

func TestPinIndexesPutSyncOutOfDepth(t *testing.T) {
ctx := context.Background()
t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return false }))

db := newTestDB(t, &Options{
Capacity: 150,
})

ch := generateTestRandomChunk()
// call unreserve on the batch with radius 0 so that
// localstore is aware of the batch and the chunk can
// be inserted into the database
unreserveChunkBatch(t, db, 0, ch)

addr := ch.Address()
_, err := db.Put(ctx, storage.ModePutSync, ch)
if err != nil {
t.Fatal(err)
}
runCountsTest(t, "putSync", db, 1, 1, 0, 0, 0, 1)

// duplicates should have no effect
_, err = db.Put(ctx, storage.ModePutSync, ch)
if err != nil {
t.Fatal(err)
}
runCountsTest(t, "putSync", db, 1, 1, 0, 0, 0, 1)

err = db.Set(ctx, storage.ModeSetUnpin, addr)
if !errors.Is(err, leveldb.ErrNotFound) {
t.Fatalf("expected not found error, got %v", err)
}
}

func TestPinIndexesPutRequest(t *testing.T) {
ctx := context.Background()
t.Cleanup(setWithinRadiusFunc(func(_ *DB, _ shed.Item) bool { return true }))
Expand Down
3 changes: 3 additions & 0 deletions pkg/localstore/sampler_test.go
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/ethersphere/bee/pkg/postage"
postagetesting "github.com/ethersphere/bee/pkg/postage/testing"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -112,6 +113,8 @@ func TestReserveSamplerStop(t *testing.T) {
startWait, waitChan := make(chan struct{}), make(chan struct{})
doneWaiting := false

t.Cleanup(setWithinRadiusFunc(func(*DB, shed.Item) bool { return true }))

testHookEvictionChan := make(chan uint64)
t.Cleanup(setTestHookEviction(func(count uint64) {
if count == 0 {
Expand Down