From 3ed7f980080caf2c0f34125a049911d7df936ef6 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Fri, 2 Dec 2022 04:06:07 +0530 Subject: [PATCH 01/13] fix: guard sampling using lock, add more metrics --- pkg/localstore/gc.go | 2 +- pkg/localstore/localstore.go | 9 +++------ pkg/localstore/metrics.go | 29 +++++++++++++++++++++++++++++ pkg/localstore/reserve.go | 23 ++++++++++++++++++++++- pkg/localstore/sampler.go | 17 +++++++++++++++-- pkg/node/node.go | 8 ++------ 6 files changed, 72 insertions(+), 16 deletions(-) diff --git a/pkg/localstore/gc.go b/pkg/localstore/gc.go index e3249464a58..bcd3a1e58b7 100644 --- a/pkg/localstore/gc.go +++ b/pkg/localstore/gc.go @@ -361,7 +361,7 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) { totalCallbacks := 0 err = db.unreserveFunc(func(batchID []byte, radius uint8) (bool, error) { totalCallbacks++ - e, err := db.UnreserveBatch(batchID, radius) + e, err := db.unreserveBatch(batchID, radius) if err != nil { return true, err } diff --git a/pkg/localstore/localstore.go b/pkg/localstore/localstore.go index 0887e0d7fa8..b471ca24457 100644 --- a/pkg/localstore/localstore.go +++ b/pkg/localstore/localstore.go @@ -184,12 +184,9 @@ type DB struct { // underlaying leveldb to prevent possible panics from // iterators subscriptionsWG sync.WaitGroup - - metrics metrics - - logger log.Logger - - validStamp postage.ValidStampFn + metrics metrics + logger log.Logger + validStamp postage.ValidStampFn } // Options struct holds optional parameters for configuring DB. diff --git a/pkg/localstore/metrics.go b/pkg/localstore/metrics.go index 12056c79062..879f2c58c48 100644 --- a/pkg/localstore/metrics.go +++ b/pkg/localstore/metrics.go @@ -66,6 +66,11 @@ type metrics struct { EvictReserveErrorCounter prometheus.Counter EvictReserveCollectedCounter prometheus.Counter TotalTimeEvictReserve prometheus.Counter + + BatchEvictCounter prometheus.Counter + BatchEvictErrorCounter prometheus.Counter + BatchEvictCollectedCounter prometheus.Counter + TotalTimeBatchEvict prometheus.Counter } func newMetrics() metrics { @@ -386,6 +391,30 @@ func newMetrics() metrics { Name: "evict_reserve_total_time", Help: "total time spent evicting from reserve", }), + BatchEvictCounter: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "batch_evict_count", + Help: "number of times the evict batch was invoked", + }), + BatchEvictErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "batch_evict_err_count", + Help: "number of times evict batch got an error", + }), + BatchEvictCollectedCounter: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "batch_evict_collected_count", + Help: "number of chunks that have been evicted for the batch expirations", + }), + TotalTimeBatchEvict: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "batch_evict_total_time", + Help: "total time spent evicting batches", + }), } } diff --git a/pkg/localstore/reserve.go b/pkg/localstore/reserve.go index 2eaee6537df..d4deddecc55 100644 --- a/pkg/localstore/reserve.go +++ b/pkg/localstore/reserve.go @@ -8,16 +8,37 @@ import ( "encoding/hex" "errors" "fmt" + "time" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/swarm" "github.com/syndtr/goleveldb/leveldb" ) +func (db *DB) EvictBatch(id []byte) error { + db.metrics.BatchEvictCounter.Inc() + defer func(start time.Time) { + totalTimeMetric(db.metrics.TotalTimeBatchEvict, start) + }(time.Now()) + + db.batchMu.Lock() + defer db.batchMu.Unlock() + + evicted, err := db.unreserveBatch(id, swarm.MaxBins) + if err != nil { + db.metrics.BatchEvictErrorCounter.Inc() + return fmt.Errorf("failed evict batch: %w", err) + } + + db.metrics.BatchEvictCollectedCounter.Add(float64(evicted)) + db.logger.Debug("evict batch", "batch_id", swarm.NewAddress(id), "evicted_count", evicted) + return nil +} + // UnreserveBatch atomically unpins chunks of a batch in proximity order upto and including po. // Unpinning will result in all chunks with pincounter 0 to be put in the gc index // so if a chunk was only pinned by the reserve, unreserving it will make it gc-able. -func (db *DB) UnreserveBatch(id []byte, radius uint8) (evicted uint64, err error) { +func (db *DB) unreserveBatch(id []byte, radius uint8) (evicted uint64, err error) { var ( item = shed.Item{ BatchID: id, diff --git a/pkg/localstore/sampler.go b/pkg/localstore/sampler.go index d10164c36a8..5b5bb40342d 100644 --- a/pkg/localstore/sampler.go +++ b/pkg/localstore/sampler.go @@ -36,6 +36,7 @@ type sampleStat struct { GetDuration atomic.Int64 HmacrDuration atomic.Int64 ValidStampDuration atomic.Int64 + TimeToLock atomic.Int64 } type sampleEntry struct { @@ -48,7 +49,8 @@ func (s sampleStat) String() string { seconds := int64(time.Second) return fmt.Sprintf( - "Chunks: %d NotFound: %d New Ignored: %d Iteration Duration: %d secs GetDuration: %d secs HmacrDuration: %d ValidStampDuration: %d", + "Chunks: %d NotFound: %d New Ignored: %d Iteration Duration: %d secs GetDuration: %d secs"+ + " HmacrDuration: %d secs ValidStampDuration: %d secs TimeToLock: %d secs", s.TotalIterated.Load(), s.NotFound.Load(), s.NewIgnored.Load(), @@ -56,6 +58,7 @@ func (s sampleStat) String() string { s.GetDuration.Load()/seconds, s.HmacrDuration.Load()/seconds, s.ValidStampDuration.Load()/seconds, + s.TimeToLock.Load()/seconds, ) } @@ -84,6 +87,11 @@ func (db *DB) ReserveSample( t := time.Now() + // protect the DB from any updates till we finish creating the sample + db.batchMu.Lock() + defer db.batchMu.Unlock() + stat.TimeToLock.Add(time.Since(t).Nanoseconds()) + // Phase 1: Iterate chunk addresses g.Go(func() error { defer close(addrChan) @@ -198,7 +206,12 @@ func (db *DB) ReserveSample( chunk := swarm.NewChunk(swarm.NewAddress(item.chunkItem.Address), item.chunkItem.Data) - stamp := postage.NewStamp(item.chunkItem.BatchID, item.chunkItem.Index, item.chunkItem.Timestamp, item.chunkItem.Sig) + stamp := postage.NewStamp( + item.chunkItem.BatchID, + item.chunkItem.Index, + item.chunkItem.Timestamp, + item.chunkItem.Sig, + ) stampData, err := stamp.MarshalBinary() if err != nil { diff --git a/pkg/node/node.go b/pkg/node/node.go index 2f40de890e3..2d690648a3b 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -274,13 +274,9 @@ func NewBee(interrupt chan struct{}, sysInterrupt chan os.Signal, addr string, p logger.Info("using network id", "network_id", networkID) var batchStore postage.Storer = new(postage.NoOpBatchStore) - var unreserveFn func([]byte, uint8) (uint64, error) + var evictFn func([]byte) error if chainEnabled { - var evictFn = func(b []byte) error { - _, err := unreserveFn(b, swarm.MaxBins) - return err - } batchStore, err = batchstore.New(stateStore, evictFn, logger) if err != nil { return nil, fmt.Errorf("batchstore: %w", err) @@ -665,7 +661,7 @@ func NewBee(interrupt chan struct{}, sysInterrupt chan os.Signal, addr string, p return nil, fmt.Errorf("localstore: %w", err) } b.localstoreCloser = storer - unreserveFn = storer.UnreserveBatch + evictFn = storer.EvictBatch post, err := postage.NewService(stateStore, batchStore, chainID) if err != nil { From e0ddfb31896de20c43a86ba2a4a3900b2cbb9291 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Fri, 2 Dec 2022 04:14:33 +0530 Subject: [PATCH 02/13] fix: tests --- pkg/localstore/gc_test.go | 2 +- pkg/localstore/mode_set_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/localstore/gc_test.go b/pkg/localstore/gc_test.go index 7ee476eed65..b7b9b2e6d5d 100644 --- a/pkg/localstore/gc_test.go +++ b/pkg/localstore/gc_test.go @@ -972,7 +972,7 @@ func setTestHookGCIteratorDone(h func()) (reset func()) { func unreserveChunkBatch(t *testing.T, db *DB, radius uint8, chs ...swarm.Chunk) { t.Helper() for _, ch := range chs { - _, err := db.UnreserveBatch(ch.Stamp().BatchID(), radius) + _, err := db.unreserveBatch(ch.Stamp().BatchID(), radius) if err != nil { t.Fatal(err) } diff --git a/pkg/localstore/mode_set_test.go b/pkg/localstore/mode_set_test.go index d01c543ee07..c0e25fae0d4 100644 --- a/pkg/localstore/mode_set_test.go +++ b/pkg/localstore/mode_set_test.go @@ -71,7 +71,7 @@ func TestModeSetRemove_WithSync(t *testing.T) { var chs []swarm.Chunk for i := 0; i < tc.count; i++ { ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false) - _, err := db.UnreserveBatch(ch.Stamp().BatchID(), 2) + _, err := db.unreserveBatch(ch.Stamp().BatchID(), 2) if err != nil { t.Fatal(err) } From 4c1ef65ca8ff556cc37e278dc017ef2d17c34b0c Mon Sep 17 00:00:00 2001 From: alok Date: Fri, 2 Dec 2022 13:33:11 +0530 Subject: [PATCH 03/13] chore: add test for new func --- pkg/localstore/reserve_test.go | 78 ++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/pkg/localstore/reserve_test.go b/pkg/localstore/reserve_test.go index a5c6bc955e2..05fbc34a654 100644 --- a/pkg/localstore/reserve_test.go +++ b/pkg/localstore/reserve_test.go @@ -7,6 +7,7 @@ package localstore import ( "context" "errors" + "fmt" "sync" "testing" "time" @@ -769,3 +770,80 @@ func TestDB_ReserveGC_BatchedUnreserve(t *testing.T) { t.Run("gc size", newIndexGCSizeTest(db)) } + +func TestDB_ReserveGC_EvictBatch(t *testing.T) { + chunkCount := 100 + + var closed chan struct{} + testHookCollectGarbageChan := make(chan uint64) + t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) { + if collectedCount == 0 { + return + } + select { + case testHookCollectGarbageChan <- collectedCount: + case <-closed: + } + })) + + stamp := postagetesting.MustNewStamp() + + db := newTestDB(t, &Options{ + Capacity: 100, + ReserveCapacity: 100, + }) + closed = db.close + + // generate chunks with the same batch and depth to trigger larger eviction + genChunk := func() swarm.Chunk { + newStamp := postagetesting.MustNewBatchStamp(stamp.BatchID()) + ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2) + return ch.WithBatch(2, 3, 2, false).WithStamp(newStamp) + } + + for i := 0; i < chunkCount; i++ { + ch := genChunk() + _, err := db.Put(context.Background(), storage.ModePutSync, ch) + if err != nil { + t.Fatal(err) + } + } + + t.Run("reserve size", reserveSizeTest(db, 100, 0)) + + err := db.EvictBatch(stamp.BatchID()) + if err != nil { + t.Fatal(err) + } + + t.Run("reserve size", reserveSizeTest(db, 0, 0)) + + gcTarget := db.gcTarget() + + for { + select { + case <-testHookCollectGarbageChan: + case <-time.After(10 * time.Second): + t.Fatal("gc timeout") + } + + gcSize, err := db.gcSize.Get() + if err != nil { + t.Fatal(err) + } + if gcSize == gcTarget { + break + } + fmt.Println(gcSize) + } + + t.Run("postage chunks index count", newItemsCountTest(db.postageChunksIndex, 90)) + + // batch is evicted + t.Run("postage radius index count", newItemsCountTest(db.postageRadiusIndex, 0)) + + // all chunks would land into the gcIndex + t.Run("gc index count", newItemsCountTest(db.gcIndex, 90)) + + t.Run("gc size", newIndexGCSizeTest(db)) +} From a1e9970d8c9ae8a69332f922bc0227cd80422e45 Mon Sep 17 00:00:00 2001 From: alok Date: Fri, 2 Dec 2022 13:46:08 +0530 Subject: [PATCH 04/13] fix: remove debug print --- pkg/localstore/reserve_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/localstore/reserve_test.go b/pkg/localstore/reserve_test.go index 05fbc34a654..9276b2c7557 100644 --- a/pkg/localstore/reserve_test.go +++ b/pkg/localstore/reserve_test.go @@ -7,7 +7,6 @@ package localstore import ( "context" "errors" - "fmt" "sync" "testing" "time" @@ -834,7 +833,6 @@ func TestDB_ReserveGC_EvictBatch(t *testing.T) { if gcSize == gcTarget { break } - fmt.Println(gcSize) } t.Run("postage chunks index count", newItemsCountTest(db.postageChunksIndex, 90)) From 1942d3566b1be130a9e6b209537e1cc5f5ad99ca Mon Sep 17 00:00:00 2001 From: alok Date: Fri, 2 Dec 2022 15:28:08 +0530 Subject: [PATCH 05/13] fix: lint --- pkg/node/node.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 2d690648a3b..abf1499a37e 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -277,7 +277,13 @@ func NewBee(interrupt chan struct{}, sysInterrupt chan os.Signal, addr string, p var evictFn func([]byte) error if chainEnabled { - batchStore, err = batchstore.New(stateStore, evictFn, logger) + batchStore, err = batchstore.New( + stateStore, + func(id []byte) error { + return evictFn(id) + }, + logger, + ) if err != nil { return nil, fmt.Errorf("batchstore: %w", err) } From 9a12a659daf3716088cf6f57cb8c16569256ed2d Mon Sep 17 00:00:00 2001 From: alok Date: Mon, 5 Dec 2022 14:12:07 +0530 Subject: [PATCH 06/13] feat: relax global locking --- go.mod | 1 + go.sum | 2 ++ pkg/localstore/gc.go | 16 ++++++++-------- pkg/localstore/localstore.go | 2 +- pkg/localstore/mode_get.go | 5 +++-- pkg/localstore/mode_put.go | 14 +++++++++++--- pkg/localstore/mode_set.go | 16 ++++++++++++++-- pkg/localstore/reserve.go | 4 ++-- pkg/localstore/sampler.go | 11 ++++++----- pkg/localstore/store_lock.go | 34 ++++++++++++++++++++++++++++++++++ 10 files changed, 82 insertions(+), 23 deletions(-) create mode 100644 pkg/localstore/store_lock.go diff --git a/go.mod b/go.mod index 1d1808c3938..2b441ca1845 100644 --- a/go.mod +++ b/go.mod @@ -177,4 +177,5 @@ require ( gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + resenje.org/multex v0.1.0 // indirect ) diff --git a/go.sum b/go.sum index 1b07e1a65c2..71640a1206b 100644 --- a/go.sum +++ b/go.sum @@ -1722,6 +1722,8 @@ resenje.org/email v0.1.3/go.mod h1:OhAVLRG3vqd9NSgayN3pAgzxTmc2B6mAefgShZvEgf0= resenje.org/jsonhttp v0.2.0/go.mod h1:EDyeguyTWj2fU3D3SCE0qNTgthzyEkHYLM1uu0uikHU= resenje.org/logging v0.1.5/go.mod h1:1IdoCm3+UwYfsplxDGV2pHCkUrLlQzlWwp4r28XfPx4= resenje.org/marshal v0.1.1/go.mod h1:P7Cla6Ju5CFvW4Y8JbRgWX1Hcy4L1w4qcCsyadO7G94= +resenje.org/multex v0.1.0 h1:am9Ndt8dIAeGVaztD8ClsSX+e0EP3mj6UdsvjukKZig= +resenje.org/multex v0.1.0/go.mod h1:3rHOoMrzqLNzgGWPcl/1GfzN52g7iaPXhbvTQ8TjGaM= resenje.org/recovery v0.1.1/go.mod h1:3S6aCVKMJEWsSAb61oZTteaiqkIfQPTr1RdiWnRbhME= resenje.org/singleflight v0.2.0 h1:nJ17VAZunMiFrfrltQ4Qs4r9MIP1pZC8u+0iSUTNnvQ= resenje.org/singleflight v0.2.0/go.mod h1:plheHgw2rd77IH3J6aN0Lu2JvMvHXoLknDwb6vN0dsE= diff --git a/pkg/localstore/gc.go b/pkg/localstore/gc.go index bcd3a1e58b7..c9d74cf7690 100644 --- a/pkg/localstore/gc.go +++ b/pkg/localstore/gc.go @@ -94,15 +94,15 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { target := db.gcTarget() // tell the localstore to start logging dirty addresses - db.batchMu.Lock() + db.lock.Lock(GCLock) db.gcRunning = true - db.batchMu.Unlock() + db.lock.Unlock(GCLock) defer func() { - db.batchMu.Lock() + db.lock.Lock(GCLock) db.gcRunning = false db.dirtyAddresses = nil - db.batchMu.Unlock() + db.lock.Unlock(GCLock) }() gcSize, err := db.gcSize.Get() @@ -142,9 +142,9 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { } // protect database from changing idexes and gcSize - db.batchMu.Lock() + db.lock.Lock(GCLock) defer totalTimeMetric(db.metrics.TotalTimeGCLock, time.Now()) - defer db.batchMu.Unlock() + defer db.lock.Unlock(GCLock) // refresh gcSize value, since it might have // changed in the meanwhile @@ -340,8 +340,8 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) { totalTimeMetric(db.metrics.TotalTimeEvictReserve, start) }(time.Now()) - db.batchMu.Lock() - defer db.batchMu.Unlock() + db.lock.Lock(ReserveLock) + defer db.lock.Unlock(ReserveLock) target = db.reserveCapacity diff --git a/pkg/localstore/localstore.go b/pkg/localstore/localstore.go index b471ca24457..8cd2d4508f3 100644 --- a/pkg/localstore/localstore.go +++ b/pkg/localstore/localstore.go @@ -154,7 +154,7 @@ type DB struct { // baseKey is the overlay address baseKey []byte - batchMu sync.Mutex + lock storeLock // gcRunning is true while GC is running. it is // used to avoid touching dirty gc index entries diff --git a/pkg/localstore/mode_get.go b/pkg/localstore/mode_get.go index 69b23481631..a3b067dcff1 100644 --- a/pkg/localstore/mode_get.go +++ b/pkg/localstore/mode_get.go @@ -129,8 +129,9 @@ func (db *DB) updateGCItems(items ...shed.Item) { // only Address and Data fields with non zero values, // which is ensured by the get function. func (db *DB) updateGC(item shed.Item) (err error) { - db.batchMu.Lock() - defer db.batchMu.Unlock() + db.lock.Lock(GCLock) + defer db.lock.Unlock(GCLock) + if db.gcRunning { db.dirtyAddresses = append(db.dirtyAddresses, swarm.NewAddress(item.Address)) } diff --git a/pkg/localstore/mode_put.go b/pkg/localstore/mode_put.go index 14334c6d474..ba256b2a2a7 100644 --- a/pkg/localstore/mode_put.go +++ b/pkg/localstore/mode_put.go @@ -68,13 +68,13 @@ func (r *releaseLocations) add(loc sharky.Location) { // in multiple put method calls. func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, retErr error) { // protect parallel updates - db.batchMu.Lock() - defer db.batchMu.Unlock() + db.lock.Lock(GCLock) if db.gcRunning { for _, ch := range chs { db.dirtyAddresses = append(db.dirtyAddresses, ch.Address()) } } + db.lock.Unlock(GCLock) batch := new(leveldb.Batch) @@ -163,6 +163,9 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) switch mode { case storage.ModePutRequest, storage.ModePutRequestPin, storage.ModePutRequestCache: + db.lock.Lock(Write) + defer db.lock.Unlock(Write) + for i, ch := range chs { pin := mode == storage.ModePutRequestPin // force pin in this mode cache := mode == storage.ModePutRequestCache // force cache @@ -177,6 +180,9 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) } case storage.ModePutUpload, storage.ModePutUploadPin: + db.lock.Lock(UploadLock) + defer db.lock.Unlock(UploadLock) + for i, ch := range chs { pin := mode == storage.ModePutUploadPin exists, c, err := putChunk(ch, i, func(item shed.Item, exists bool) (int64, error) { @@ -189,13 +195,15 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) if !exists { // chunk is new so, trigger subscription feeds // after the batch is successfully written - triggerPullFeed[db.po(ch.Address())] = struct{}{} triggerPushFeed = true } gcSizeChange += c } case storage.ModePutSync: + db.lock.Lock(Write) + defer db.lock.Unlock(Write) + for i, ch := range chs { exists, c, err := putChunk(ch, i, func(item shed.Item, exists bool) (int64, error) { return db.putSync(batch, binIDs, item, exists) diff --git a/pkg/localstore/mode_set.go b/pkg/localstore/mode_set.go index 0a69ac13dfd..8504933f489 100644 --- a/pkg/localstore/mode_set.go +++ b/pkg/localstore/mode_set.go @@ -49,11 +49,11 @@ func (db *DB) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr // chunks represented by provided addresses. func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) { // protect parallel updates - db.batchMu.Lock() - defer db.batchMu.Unlock() + db.lock.Lock(GCLock) if db.gcRunning { db.dirtyAddresses = append(db.dirtyAddresses, addrs...) } + db.lock.Unlock(GCLock) batch := new(leveldb.Batch) var committedLocations []sharky.Location @@ -68,6 +68,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr switch mode { case storage.ModeSetSync: + db.lock.Lock(GCLock) + defer db.lock.Unlock(GCLock) + for _, addr := range addrs { c, err := db.setSync(batch, addr) if err != nil { @@ -77,6 +80,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr } case storage.ModeSetRemove: + db.lock.Lock(Write) + defer db.lock.Unlock(Write) + for _, addr := range addrs { item := addressToItem(addr) storedItem, err := db.retrievalDataIndex.Get(item) @@ -96,6 +102,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr } case storage.ModeSetPin: + db.lock.Lock(GCLock) + defer db.lock.Unlock(GCLock) + for _, addr := range addrs { item := addressToItem(addr) c, err := db.setPin(batch, item) @@ -105,6 +114,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr gcSizeChange += c } case storage.ModeSetUnpin: + db.lock.Lock(GCLock) + defer db.lock.Unlock(GCLock) + for _, addr := range addrs { c, err := db.setUnpin(batch, addr) if err != nil { diff --git a/pkg/localstore/reserve.go b/pkg/localstore/reserve.go index d4deddecc55..9d15fab2cfb 100644 --- a/pkg/localstore/reserve.go +++ b/pkg/localstore/reserve.go @@ -21,8 +21,8 @@ func (db *DB) EvictBatch(id []byte) error { totalTimeMetric(db.metrics.TotalTimeBatchEvict, start) }(time.Now()) - db.batchMu.Lock() - defer db.batchMu.Unlock() + db.lock.Lock(ReserveLock) + defer db.lock.Unlock(ReserveLock) evicted, err := db.unreserveBatch(id, swarm.MaxBins) if err != nil { diff --git a/pkg/localstore/sampler.go b/pkg/localstore/sampler.go index 5b5bb40342d..b5b7c34769d 100644 --- a/pkg/localstore/sampler.go +++ b/pkg/localstore/sampler.go @@ -87,15 +87,16 @@ func (db *DB) ReserveSample( t := time.Now() - // protect the DB from any updates till we finish creating the sample - db.batchMu.Lock() - defer db.batchMu.Unlock() - stat.TimeToLock.Add(time.Since(t).Nanoseconds()) - // Phase 1: Iterate chunk addresses g.Go(func() error { defer close(addrChan) iterationStart := time.Now() + + // protect the DB from any updates till we finish creating the sample + db.lock.Lock(ReserveLock) + defer db.lock.Unlock(ReserveLock) + stat.TimeToLock.Add(time.Since(t).Nanoseconds()) + err := db.pullIndex.Iterate(func(item shed.Item) (bool, error) { select { case addrChan <- swarm.NewAddress(item.Address): diff --git a/pkg/localstore/store_lock.go b/pkg/localstore/store_lock.go new file mode 100644 index 00000000000..f3243ae437a --- /dev/null +++ b/pkg/localstore/store_lock.go @@ -0,0 +1,34 @@ +package localstore + +import "resenje.org/multex" + +const ( + UploadLock string = "upload" + ReserveLock = "reserve" + GCLock = "gc" + Write = "write" +) + +type storeLock struct { + multex.Multex +} + +func (s *storeLock) Lock(lk string) { + switch lk { + case UploadLock, ReserveLock, GCLock: + s.Multex.Lock(lk) + case Write: + s.Multex.Lock(ReserveLock) + s.Multex.Lock(GCLock) + } +} + +func (s *storeLock) Unlock(lk string) { + switch lk { + case UploadLock, ReserveLock, GCLock: + s.Multex.Unlock(lk) + case Write: + s.Multex.Unlock(ReserveLock) + s.Multex.Unlock(GCLock) + } +} From 208fefb5f2b66ca7e4b2e0c27c755cf66f0f29fc Mon Sep 17 00:00:00 2001 From: alok Date: Mon, 5 Dec 2022 14:21:37 +0530 Subject: [PATCH 07/13] fix: init lock --- pkg/localstore/localstore.go | 1 + pkg/localstore/store_lock.go | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/localstore/localstore.go b/pkg/localstore/localstore.go index 8cd2d4508f3..f834ca774cf 100644 --- a/pkg/localstore/localstore.go +++ b/pkg/localstore/localstore.go @@ -269,6 +269,7 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger metrics: newMetrics(), logger: logger.WithName(loggerName).Register(), validStamp: o.ValidStamp, + lock: newStoreLock(), } if db.cacheCapacity == 0 { db.cacheCapacity = defaultCacheCapacity diff --git a/pkg/localstore/store_lock.go b/pkg/localstore/store_lock.go index f3243ae437a..855a67863bc 100644 --- a/pkg/localstore/store_lock.go +++ b/pkg/localstore/store_lock.go @@ -10,7 +10,13 @@ const ( ) type storeLock struct { - multex.Multex + *multex.Multex +} + +func newStoreLock() storeLock { + return storeLock{ + Multex: multex.New(), + } } func (s *storeLock) Lock(lk string) { From b4d84c4a64de409d457a07113e3e114ca2d3362d Mon Sep 17 00:00:00 2001 From: alok Date: Mon, 5 Dec 2022 14:42:04 +0530 Subject: [PATCH 08/13] fix: lock contexts --- pkg/localstore/gc.go | 5 +++-- pkg/localstore/reserve.go | 7 +++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/localstore/gc.go b/pkg/localstore/gc.go index c9d74cf7690..5965a6bded8 100644 --- a/pkg/localstore/gc.go +++ b/pkg/localstore/gc.go @@ -340,8 +340,9 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) { totalTimeMetric(db.metrics.TotalTimeEvictReserve, start) }(time.Now()) - db.lock.Lock(ReserveLock) - defer db.lock.Unlock(ReserveLock) + // reserve eviction affects the reserve indexes as well as the GC indexes + db.lock.Lock(Write) + defer db.lock.Unlock(Write) target = db.reserveCapacity diff --git a/pkg/localstore/reserve.go b/pkg/localstore/reserve.go index 9d15fab2cfb..8fde34875c5 100644 --- a/pkg/localstore/reserve.go +++ b/pkg/localstore/reserve.go @@ -15,14 +15,17 @@ import ( "github.com/syndtr/goleveldb/leveldb" ) +// EvictBatch will evict all chunks associated with the batch from the reserve. This +// is used by batch store for expirations. func (db *DB) EvictBatch(id []byte) error { db.metrics.BatchEvictCounter.Inc() defer func(start time.Time) { totalTimeMetric(db.metrics.TotalTimeBatchEvict, start) }(time.Now()) - db.lock.Lock(ReserveLock) - defer db.lock.Unlock(ReserveLock) + // EvictBatch will affect the reserve as well as GC indexes + db.lock.Lock(Write) + defer db.lock.Unlock(Write) evicted, err := db.unreserveBatch(id, swarm.MaxBins) if err != nil { From b328c9137c93d5e4d57108dbed4feef6a60935ca Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Mon, 5 Dec 2022 19:26:22 +0530 Subject: [PATCH 09/13] fix: stop sampler during evictions --- pkg/localstore/gc.go | 16 +++++++++------ pkg/localstore/localstore.go | 2 ++ pkg/localstore/mode_get.go | 4 ++-- pkg/localstore/mode_put.go | 8 ++++---- pkg/localstore/mode_set.go | 16 +++++++-------- pkg/localstore/sampler.go | 33 ++++++++++++++++++++++++++++--- pkg/localstore/store_lock.go | 25 ++++++++++++----------- pkg/postage/batchstore/reserve.go | 11 +++++++---- 8 files changed, 76 insertions(+), 39 deletions(-) diff --git a/pkg/localstore/gc.go b/pkg/localstore/gc.go index 5965a6bded8..e588c36f425 100644 --- a/pkg/localstore/gc.go +++ b/pkg/localstore/gc.go @@ -94,15 +94,15 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { target := db.gcTarget() // tell the localstore to start logging dirty addresses - db.lock.Lock(GCLock) + db.lock.Lock(GC) db.gcRunning = true - db.lock.Unlock(GCLock) + db.lock.Unlock(GC) defer func() { - db.lock.Lock(GCLock) + db.lock.Lock(GC) db.gcRunning = false db.dirtyAddresses = nil - db.lock.Unlock(GCLock) + db.lock.Unlock(GC) }() gcSize, err := db.gcSize.Get() @@ -142,9 +142,9 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { } // protect database from changing idexes and gcSize - db.lock.Lock(GCLock) + db.lock.Lock(GC) defer totalTimeMetric(db.metrics.TotalTimeGCLock, time.Now()) - defer db.lock.Unlock(GCLock) + defer db.lock.Unlock(GC) // refresh gcSize value, since it might have // changed in the meanwhile @@ -361,6 +361,10 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) { // of triggering subsequent runs in case we're not done totalCallbacks := 0 err = db.unreserveFunc(func(batchID []byte, radius uint8) (bool, error) { + // if we start evicting chunks from the reserve during sampling, we should + // fail the sampling process and not participate in the current round. + db.stopSamplingIfStarted() + totalCallbacks++ e, err := db.unreserveBatch(batchID, radius) if err != nil { diff --git a/pkg/localstore/localstore.go b/pkg/localstore/localstore.go index f834ca774cf..c43caf3aa21 100644 --- a/pkg/localstore/localstore.go +++ b/pkg/localstore/localstore.go @@ -187,6 +187,8 @@ type DB struct { metrics metrics logger log.Logger validStamp postage.ValidStampFn + // sampler stop is used to synchronize the reserve eviction and sampling process + samplerStop chan struct{} } // Options struct holds optional parameters for configuring DB. diff --git a/pkg/localstore/mode_get.go b/pkg/localstore/mode_get.go index a3b067dcff1..fb042c90c4d 100644 --- a/pkg/localstore/mode_get.go +++ b/pkg/localstore/mode_get.go @@ -129,8 +129,8 @@ func (db *DB) updateGCItems(items ...shed.Item) { // only Address and Data fields with non zero values, // which is ensured by the get function. func (db *DB) updateGC(item shed.Item) (err error) { - db.lock.Lock(GCLock) - defer db.lock.Unlock(GCLock) + db.lock.Lock(GC) + defer db.lock.Unlock(GC) if db.gcRunning { db.dirtyAddresses = append(db.dirtyAddresses, swarm.NewAddress(item.Address)) diff --git a/pkg/localstore/mode_put.go b/pkg/localstore/mode_put.go index ba256b2a2a7..0eb3d965808 100644 --- a/pkg/localstore/mode_put.go +++ b/pkg/localstore/mode_put.go @@ -68,13 +68,13 @@ func (r *releaseLocations) add(loc sharky.Location) { // in multiple put method calls. func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, retErr error) { // protect parallel updates - db.lock.Lock(GCLock) + db.lock.Lock(GC) if db.gcRunning { for _, ch := range chs { db.dirtyAddresses = append(db.dirtyAddresses, ch.Address()) } } - db.lock.Unlock(GCLock) + db.lock.Unlock(GC) batch := new(leveldb.Batch) @@ -180,8 +180,8 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) } case storage.ModePutUpload, storage.ModePutUploadPin: - db.lock.Lock(UploadLock) - defer db.lock.Unlock(UploadLock) + db.lock.Lock(Upload) + defer db.lock.Unlock(Upload) for i, ch := range chs { pin := mode == storage.ModePutUploadPin diff --git a/pkg/localstore/mode_set.go b/pkg/localstore/mode_set.go index 8504933f489..05101f82112 100644 --- a/pkg/localstore/mode_set.go +++ b/pkg/localstore/mode_set.go @@ -49,11 +49,11 @@ func (db *DB) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr // chunks represented by provided addresses. func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) { // protect parallel updates - db.lock.Lock(GCLock) + db.lock.Lock(GC) if db.gcRunning { db.dirtyAddresses = append(db.dirtyAddresses, addrs...) } - db.lock.Unlock(GCLock) + db.lock.Unlock(GC) batch := new(leveldb.Batch) var committedLocations []sharky.Location @@ -68,8 +68,8 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr switch mode { case storage.ModeSetSync: - db.lock.Lock(GCLock) - defer db.lock.Unlock(GCLock) + db.lock.Lock(GC) + defer db.lock.Unlock(GC) for _, addr := range addrs { c, err := db.setSync(batch, addr) @@ -102,8 +102,8 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr } case storage.ModeSetPin: - db.lock.Lock(GCLock) - defer db.lock.Unlock(GCLock) + db.lock.Lock(GC) + defer db.lock.Unlock(GC) for _, addr := range addrs { item := addressToItem(addr) @@ -114,8 +114,8 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr gcSizeChange += c } case storage.ModeSetUnpin: - db.lock.Lock(GCLock) - defer db.lock.Unlock(GCLock) + db.lock.Lock(GC) + defer db.lock.Unlock(GC) for _, addr := range addrs { c, err := db.setUnpin(batch, addr) diff --git a/pkg/localstore/sampler.go b/pkg/localstore/sampler.go index b5b7c34769d..d84aa482ae1 100644 --- a/pkg/localstore/sampler.go +++ b/pkg/localstore/sampler.go @@ -27,6 +27,7 @@ import ( const sampleSize = 8 var errDbClosed = errors.New("database closed") +var errSamplerStopped = errors.New("sampler stopped due to ongoing evictions") type sampleStat struct { TotalIterated atomic.Int64 @@ -86,15 +87,20 @@ func (db *DB) ReserveSample( logger := db.logger.WithName("sampler").V(1).Register() t := time.Now() + // signal start of sampling to see if we get any evictions during the sampler + // run + db.startSampling() // Phase 1: Iterate chunk addresses g.Go(func() error { defer close(addrChan) iterationStart := time.Now() - // protect the DB from any updates till we finish creating the sample - db.lock.Lock(ReserveLock) - defer db.lock.Unlock(ReserveLock) + // this is a slightly relaxed lock. GC operations are allowed during this + // as we dont care about the gcIndex. This lock only prevents evictions and + // put/set operations related to the reserve. + db.lock.Lock(Reserve) + defer db.lock.Unlock(Reserve) stat.TimeToLock.Add(time.Since(t).Nanoseconds()) err := db.pullIndex.Iterate(func(item shed.Item) (bool, error) { @@ -159,6 +165,11 @@ func (db *DB) ReserveSample( return ctx.Err() case <-db.close: return errDbClosed + case <-db.samplerStop: + db.lock.Lock(Sampling) + db.samplerStop = nil + db.lock.Unlock(Sampling) + return errSamplerStopped } } @@ -264,3 +275,19 @@ func (db *DB) ReserveSample( func le(a, b []byte) bool { return bytes.Compare(a, b) == -1 } + +func (db *DB) startSampling() { + db.lock.Lock(Sampling) + defer db.lock.Unlock(Sampling) + + db.samplerStop = make(chan struct{}) +} + +func (db *DB) stopSamplingIfStarted() { + db.lock.Lock(Sampling) + defer db.lock.Unlock(Sampling) + + if db.samplerStop != nil { + close(db.samplerStop) + } +} diff --git a/pkg/localstore/store_lock.go b/pkg/localstore/store_lock.go index 855a67863bc..ca5ff80976c 100644 --- a/pkg/localstore/store_lock.go +++ b/pkg/localstore/store_lock.go @@ -3,10 +3,11 @@ package localstore import "resenje.org/multex" const ( - UploadLock string = "upload" - ReserveLock = "reserve" - GCLock = "gc" - Write = "write" + Upload string = "upload" + Reserve = "reserve" + GC = "gc" + Write = "write" + Sampling = "sampling" ) type storeLock struct { @@ -21,20 +22,20 @@ func newStoreLock() storeLock { func (s *storeLock) Lock(lk string) { switch lk { - case UploadLock, ReserveLock, GCLock: - s.Multex.Lock(lk) case Write: - s.Multex.Lock(ReserveLock) - s.Multex.Lock(GCLock) + s.Multex.Lock(Reserve) + s.Multex.Lock(GC) + default: + s.Multex.Lock(lk) } } func (s *storeLock) Unlock(lk string) { switch lk { - case UploadLock, ReserveLock, GCLock: - s.Multex.Unlock(lk) case Write: - s.Multex.Unlock(ReserveLock) - s.Multex.Unlock(GCLock) + s.Multex.Unlock(Reserve) + s.Multex.Unlock(GC) + default: + s.Multex.Unlock(lk) } } diff --git a/pkg/postage/batchstore/reserve.go b/pkg/postage/batchstore/reserve.go index 54e1ea811c0..146bb54e27d 100644 --- a/pkg/postage/batchstore/reserve.go +++ b/pkg/postage/batchstore/reserve.go @@ -106,10 +106,6 @@ func (s *store) cleanup() error { for _, b := range evictions { - err := s.evictFn(b.ID) - if err != nil { - return fmt.Errorf("evict batch %x: %w", b.ID, err) - } err = s.store.Delete(valueKey(b.Value, b.ID)) if err != nil { return fmt.Errorf("delete value key for batch %x: %w", b.ID, err) @@ -118,6 +114,13 @@ func (s *store) cleanup() error { if err != nil { return fmt.Errorf("delete batch %x: %w", b.ID, err) } + // first remove the batch from the batchstore as if a sampling process is + // started, these chunks will not be considered in the sample and they can + // be safely evicted in the background. + err := s.evictFn(b.ID) + if err != nil { + return fmt.Errorf("evict batch %x: %w", b.ID, err) + } if s.batchExpiry != nil { s.batchExpiry.HandleStampExpiry(b.ID) } From 6367d0951b679fa7a506da19a1973ab3a3c7f0f5 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Tue, 6 Dec 2022 18:26:44 +0530 Subject: [PATCH 10/13] fix: remove reserve lock --- pkg/localstore/gc.go | 23 ++++---- pkg/localstore/localstore.go | 27 ++++++++-- pkg/localstore/metrics.go | 22 ++++++++ pkg/localstore/mode_get.go | 4 +- pkg/localstore/mode_put.go | 22 ++++---- pkg/localstore/mode_set.go | 31 ++++++----- pkg/localstore/reserve.go | 6 ++- pkg/localstore/sampler.go | 42 +++++++-------- pkg/localstore/sampler_test.go | 97 ++++++++++++++++++++++++++++++++++ pkg/localstore/store_lock.go | 41 -------------- 10 files changed, 207 insertions(+), 108 deletions(-) delete mode 100644 pkg/localstore/store_lock.go diff --git a/pkg/localstore/gc.go b/pkg/localstore/gc.go index e588c36f425..a86d85c76a7 100644 --- a/pkg/localstore/gc.go +++ b/pkg/localstore/gc.go @@ -94,15 +94,15 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { target := db.gcTarget() // tell the localstore to start logging dirty addresses - db.lock.Lock(GC) + db.lock.Lock(lockKeyGC) db.gcRunning = true - db.lock.Unlock(GC) + db.lock.Unlock(lockKeyGC) defer func() { - db.lock.Lock(GC) + db.lock.Lock(lockKeyGC) db.gcRunning = false db.dirtyAddresses = nil - db.lock.Unlock(GC) + db.lock.Unlock(lockKeyGC) }() gcSize, err := db.gcSize.Get() @@ -142,9 +142,9 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { } // protect database from changing idexes and gcSize - db.lock.Lock(GC) + db.lock.Lock(lockKeyGC) defer totalTimeMetric(db.metrics.TotalTimeGCLock, time.Now()) - defer db.lock.Unlock(GC) + defer db.lock.Unlock(lockKeyGC) // refresh gcSize value, since it might have // changed in the meanwhile @@ -341,8 +341,8 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) { }(time.Now()) // reserve eviction affects the reserve indexes as well as the GC indexes - db.lock.Lock(Write) - defer db.lock.Unlock(Write) + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) target = db.reserveCapacity @@ -357,13 +357,14 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) { return 0, true, nil } + // if we start evicting chunks from the reserve during sampling, we should + // fail the sampling process and not participate in the current round. + db.stopSamplingIfRunning() + // if we dont get any entries at all then there's no use // of triggering subsequent runs in case we're not done totalCallbacks := 0 err = db.unreserveFunc(func(batchID []byte, radius uint8) (bool, error) { - // if we start evicting chunks from the reserve during sampling, we should - // fail the sampling process and not participate in the current round. - db.stopSamplingIfStarted() totalCallbacks++ e, err := db.unreserveBatch(batchID, radius) diff --git a/pkg/localstore/localstore.go b/pkg/localstore/localstore.go index c43caf3aa21..b3dd29700e1 100644 --- a/pkg/localstore/localstore.go +++ b/pkg/localstore/localstore.go @@ -41,6 +41,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" "github.com/syndtr/goleveldb/leveldb" + "resenje.org/multex" ) // loggerName is the tree path name of the logger for this package. @@ -72,6 +73,23 @@ const ( sharkyDirtyFileName = ".DIRTY" ) +const ( + // lockKeyUpload is used to guard against parallel updates during upload. These + // updates are made to mainly the pushIndex and doesnt involve the GC or Reserve + // indexes. Hence this lock is separated to allow GC/Reserve operations to continue + // along with uploads. + lockKeyUpload string = "upload" + // lockKeyGC is used to guard against parallel updates to the gcIndex and gcSize. + // The gcSize is a counter maintained for the gcIndex and hence parallel updates + // here need to be prevented. The reserve and GC locks are separated as the gcIndex + // and pullIndex are now mutually exclusive. So there are operations that could + // happen in parallel. This is slightly better than having a global lock. + lockKeyGC string = "gc" + // lockKeySampling is used to synchronize the sampler stopping process if evictions + // start during sampling. + lockKeySampling string = "sampling" +) + // DB is the local store implementation and holds // database related objects. type DB struct { @@ -154,7 +172,7 @@ type DB struct { // baseKey is the overlay address baseKey []byte - lock storeLock + lock *multex.Multex // gcRunning is true while GC is running. it is // used to avoid touching dirty gc index entries @@ -187,8 +205,9 @@ type DB struct { metrics metrics logger log.Logger validStamp postage.ValidStampFn - // sampler stop is used to synchronize the reserve eviction and sampling process - samplerStop chan struct{} + // following fields are used to synchronize sampling and reserve eviction + samplerStop *sync.Once + samplerSignal chan struct{} } // Options struct holds optional parameters for configuring DB. @@ -271,7 +290,7 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger metrics: newMetrics(), logger: logger.WithName(loggerName).Register(), validStamp: o.ValidStamp, - lock: newStoreLock(), + lock: multex.New(), } if db.cacheCapacity == 0 { db.cacheCapacity = defaultCacheCapacity diff --git a/pkg/localstore/metrics.go b/pkg/localstore/metrics.go index 879f2c58c48..bf709164bfc 100644 --- a/pkg/localstore/metrics.go +++ b/pkg/localstore/metrics.go @@ -71,6 +71,10 @@ type metrics struct { BatchEvictErrorCounter prometheus.Counter BatchEvictCollectedCounter prometheus.Counter TotalTimeBatchEvict prometheus.Counter + + SamplerSuccessfulRuns prometheus.Counter + SamplerFailedRuns prometheus.Counter + SamplerStopped prometheus.Counter } func newMetrics() metrics { @@ -415,6 +419,24 @@ func newMetrics() metrics { Name: "batch_evict_total_time", Help: "total time spent evicting batches", }), + SamplerSuccessfulRuns: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "sampler_successful_runs_count", + Help: "number of times the sampler ran successfully", + }), + SamplerFailedRuns: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "sampler_failed_runs_count", + Help: "number of times sampler failed", + }), + SamplerStopped: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "sampler_stopped_count", + Help: "number of times sampler was stopped due to evictions", + }), } } diff --git a/pkg/localstore/mode_get.go b/pkg/localstore/mode_get.go index fb042c90c4d..99e02042348 100644 --- a/pkg/localstore/mode_get.go +++ b/pkg/localstore/mode_get.go @@ -129,8 +129,8 @@ func (db *DB) updateGCItems(items ...shed.Item) { // only Address and Data fields with non zero values, // which is ensured by the get function. func (db *DB) updateGC(item shed.Item) (err error) { - db.lock.Lock(GC) - defer db.lock.Unlock(GC) + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) if db.gcRunning { db.dirtyAddresses = append(db.dirtyAddresses, swarm.NewAddress(item.Address)) diff --git a/pkg/localstore/mode_put.go b/pkg/localstore/mode_put.go index 0eb3d965808..78833061fee 100644 --- a/pkg/localstore/mode_put.go +++ b/pkg/localstore/mode_put.go @@ -68,13 +68,13 @@ func (r *releaseLocations) add(loc sharky.Location) { // in multiple put method calls. func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, retErr error) { // protect parallel updates - db.lock.Lock(GC) + db.lock.Lock(lockKeyGC) if db.gcRunning { for _, ch := range chs { db.dirtyAddresses = append(db.dirtyAddresses, ch.Address()) } } - db.lock.Unlock(GC) + db.lock.Unlock(lockKeyGC) batch := new(leveldb.Batch) @@ -163,9 +163,6 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) switch mode { case storage.ModePutRequest, storage.ModePutRequestPin, storage.ModePutRequestCache: - db.lock.Lock(Write) - defer db.lock.Unlock(Write) - for i, ch := range chs { pin := mode == storage.ModePutRequestPin // force pin in this mode cache := mode == storage.ModePutRequestCache // force cache @@ -179,10 +176,10 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) gcSizeChange += c } - case storage.ModePutUpload, storage.ModePutUploadPin: - db.lock.Lock(Upload) - defer db.lock.Unlock(Upload) + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + case storage.ModePutUpload, storage.ModePutUploadPin: for i, ch := range chs { pin := mode == storage.ModePutUploadPin exists, c, err := putChunk(ch, i, func(item shed.Item, exists bool) (int64, error) { @@ -200,10 +197,10 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) gcSizeChange += c } - case storage.ModePutSync: - db.lock.Lock(Write) - defer db.lock.Unlock(Write) + db.lock.Lock(lockKeyUpload) + defer db.lock.Unlock(lockKeyUpload) + case storage.ModePutSync: for i, ch := range chs { exists, c, err := putChunk(ch, i, func(item shed.Item, exists bool) (int64, error) { return db.putSync(batch, binIDs, item, exists) @@ -220,6 +217,9 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) gcSizeChange += c } + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + default: return nil, ErrInvalidMode } diff --git a/pkg/localstore/mode_set.go b/pkg/localstore/mode_set.go index 05101f82112..19c4074a0ab 100644 --- a/pkg/localstore/mode_set.go +++ b/pkg/localstore/mode_set.go @@ -49,11 +49,11 @@ func (db *DB) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr // chunks represented by provided addresses. func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) { // protect parallel updates - db.lock.Lock(GC) + db.lock.Lock(lockKeyGC) if db.gcRunning { db.dirtyAddresses = append(db.dirtyAddresses, addrs...) } - db.lock.Unlock(GC) + db.lock.Unlock(lockKeyGC) batch := new(leveldb.Batch) var committedLocations []sharky.Location @@ -68,9 +68,6 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr switch mode { case storage.ModeSetSync: - db.lock.Lock(GC) - defer db.lock.Unlock(GC) - for _, addr := range addrs { c, err := db.setSync(batch, addr) if err != nil { @@ -79,10 +76,10 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr gcSizeChange += c } - case storage.ModeSetRemove: - db.lock.Lock(Write) - defer db.lock.Unlock(Write) + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + case storage.ModeSetRemove: for _, addr := range addrs { item := addressToItem(addr) storedItem, err := db.retrievalDataIndex.Get(item) @@ -101,10 +98,10 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr gcSizeChange += c } - case storage.ModeSetPin: - db.lock.Lock(GC) - defer db.lock.Unlock(GC) + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + case storage.ModeSetPin: for _, addr := range addrs { item := addressToItem(addr) c, err := db.setPin(batch, item) @@ -113,10 +110,10 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr } gcSizeChange += c } - case storage.ModeSetUnpin: - db.lock.Lock(GC) - defer db.lock.Unlock(GC) + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + case storage.ModeSetUnpin: for _, addr := range addrs { c, err := db.setUnpin(batch, addr) if err != nil { @@ -124,6 +121,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr } gcSizeChange += c } + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + default: return ErrInvalidMode } @@ -411,6 +411,5 @@ func (db *DB) setUnpin(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange i return 0, err } - gcSizeChange++ - return gcSizeChange, nil + return 1, nil } diff --git a/pkg/localstore/reserve.go b/pkg/localstore/reserve.go index 8fde34875c5..8d174759200 100644 --- a/pkg/localstore/reserve.go +++ b/pkg/localstore/reserve.go @@ -24,8 +24,10 @@ func (db *DB) EvictBatch(id []byte) error { }(time.Now()) // EvictBatch will affect the reserve as well as GC indexes - db.lock.Lock(Write) - defer db.lock.Unlock(Write) + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + + db.stopSamplingIfRunning() evicted, err := db.unreserveBatch(id, swarm.MaxBins) if err != nil { diff --git a/pkg/localstore/sampler.go b/pkg/localstore/sampler.go index d84aa482ae1..a3dbd829a23 100644 --- a/pkg/localstore/sampler.go +++ b/pkg/localstore/sampler.go @@ -11,6 +11,7 @@ import ( "encoding/binary" "errors" "fmt" + "sync" "time" "github.com/ethersphere/bee/pkg/bmtpool" @@ -37,7 +38,6 @@ type sampleStat struct { GetDuration atomic.Int64 HmacrDuration atomic.Int64 ValidStampDuration atomic.Int64 - TimeToLock atomic.Int64 } type sampleEntry struct { @@ -51,7 +51,7 @@ func (s sampleStat) String() string { return fmt.Sprintf( "Chunks: %d NotFound: %d New Ignored: %d Iteration Duration: %d secs GetDuration: %d secs"+ - " HmacrDuration: %d secs ValidStampDuration: %d secs TimeToLock: %d secs", + " HmacrDuration: %d secs ValidStampDuration: %d secs", s.TotalIterated.Load(), s.NotFound.Load(), s.NewIgnored.Load(), @@ -59,7 +59,6 @@ func (s sampleStat) String() string { s.GetDuration.Load()/seconds, s.HmacrDuration.Load()/seconds, s.ValidStampDuration.Load()/seconds, - s.TimeToLock.Load()/seconds, ) } @@ -90,19 +89,13 @@ func (db *DB) ReserveSample( // signal start of sampling to see if we get any evictions during the sampler // run db.startSampling() + defer db.stopSamplingIfRunning() // Phase 1: Iterate chunk addresses g.Go(func() error { defer close(addrChan) iterationStart := time.Now() - // this is a slightly relaxed lock. GC operations are allowed during this - // as we dont care about the gcIndex. This lock only prevents evictions and - // put/set operations related to the reserve. - db.lock.Lock(Reserve) - defer db.lock.Unlock(Reserve) - stat.TimeToLock.Add(time.Since(t).Nanoseconds()) - err := db.pullIndex.Iterate(func(item shed.Item) (bool, error) { select { case addrChan <- swarm.NewAddress(item.Address): @@ -165,10 +158,11 @@ func (db *DB) ReserveSample( return ctx.Err() case <-db.close: return errDbClosed - case <-db.samplerStop: - db.lock.Lock(Sampling) + case <-db.samplerSignal: + db.lock.Lock(lockKeySampling) db.samplerStop = nil - db.lock.Unlock(Sampling) + db.samplerSignal = nil + db.lock.Unlock(lockKeySampling) return errSamplerStopped } } @@ -242,11 +236,14 @@ func (db *DB) ReserveSample( } stat.ValidStampDuration.Add(time.Since(validStart).Nanoseconds()) - } } if err := g.Wait(); err != nil { + db.metrics.SamplerFailedRuns.Inc() + if errors.Is(err, errSamplerStopped) { + db.metrics.SamplerStopped.Inc() + } return storage.Sample{}, fmt.Errorf("sampler: failed creating sample: %w", err) } @@ -256,6 +253,7 @@ func (db *DB) ReserveSample( for _, s := range sampleItems { _, err := hasher.Write(s.Bytes()) if err != nil { + db.metrics.SamplerFailedRuns.Inc() return storage.Sample{}, fmt.Errorf("sampler: failed creating root hash of sample: %w", err) } } @@ -266,6 +264,7 @@ func (db *DB) ReserveSample( Hash: swarm.NewAddress(hash), } + db.metrics.SamplerSuccessfulRuns.Inc() logger.Info("sampler done", "duration", time.Since(t), "storage_radius", storageRadius, "consensus_time_ns", consensusTime, "stats", stat, "sample", sample) return sample, nil @@ -277,17 +276,18 @@ func le(a, b []byte) bool { } func (db *DB) startSampling() { - db.lock.Lock(Sampling) - defer db.lock.Unlock(Sampling) + db.lock.Lock(lockKeySampling) + defer db.lock.Unlock(lockKeySampling) - db.samplerStop = make(chan struct{}) + db.samplerStop = new(sync.Once) + db.samplerSignal = make(chan struct{}) } -func (db *DB) stopSamplingIfStarted() { - db.lock.Lock(Sampling) - defer db.lock.Unlock(Sampling) +func (db *DB) stopSamplingIfRunning() { + db.lock.Lock(lockKeySampling) + defer db.lock.Unlock(lockKeySampling) if db.samplerStop != nil { - close(db.samplerStop) + db.samplerStop.Do(func() { close(db.samplerSignal) }) } } diff --git a/pkg/localstore/sampler_test.go b/pkg/localstore/sampler_test.go index 14fe07a0755..090433e6d89 100644 --- a/pkg/localstore/sampler_test.go +++ b/pkg/localstore/sampler_test.go @@ -7,9 +7,12 @@ package localstore import ( "bytes" "context" + "errors" + "sync" "testing" "time" + "github.com/ethersphere/bee/pkg/postage" postagetesting "github.com/ethersphere/bee/pkg/postage/testing" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -95,3 +98,97 @@ func TestReserveSampler(t *testing.T) { } }) } + +func TestReserveSamplerStop(t *testing.T) { + const chunkCountPerPO = 10 + const maxPO = 10 + var ( + chs []swarm.Chunk + batchIDs [][]byte + closed chan struct{} + doneMtx sync.Mutex + mtx sync.Mutex + ) + startWait, waitChan := make(chan struct{}), make(chan struct{}) + doneWaiting := false + + testHookEvictionChan := make(chan uint64) + t.Cleanup(setTestHookEviction(func(count uint64) { + if count == 0 { + return + } + select { + case testHookEvictionChan <- count: + case <-closed: + } + })) + + db := newTestDB(t, &Options{ + ReserveCapacity: 90, + UnreserveFunc: func(f postage.UnreserveIteratorFn) error { + mtx.Lock() + defer mtx.Unlock() + for i := 0; i < len(batchIDs); i++ { + // pop an element from batchIDs, call the Unreserve + item := batchIDs[i] + // here we mock the behavior of the batchstore + // that would call the localstore back with the + // batch IDs and the radiuses from the FIFO queue + stop, err := f(item, 2) + if err != nil { + return err + } + if stop { + return nil + } + } + batchIDs = nil + return nil + }, + ValidStamp: func(_ swarm.Chunk, stampBytes []byte) (chunk swarm.Chunk, err error) { + doneMtx.Lock() + defer doneMtx.Unlock() + + if !doneWaiting { + // signal that we have started sampling + close(startWait) + // this makes sampling wait till we trigger eviction for the test + <-waitChan + } + doneWaiting = true + return nil, nil + }, + }) + closed = db.close + + for po := 0; po < maxPO; po++ { + for i := 0; i < chunkCountPerPO; i++ { + ch := generateValidRandomChunkAt(swarm.NewAddress(db.baseKey), po).WithBatch(2, 3, 2, false) + mtx.Lock() + chs = append(chs, ch) + batchIDs = append(batchIDs, ch.Stamp().BatchID()) + mtx.Unlock() + } + } + + _, err := db.Put(context.Background(), storage.ModePutSync, chs...) + if err != nil { + t.Fatal(err) + } + + go func() { + <-startWait + // this will trigger the eviction + _, err := db.ComputeReserveSize(0) + if err != nil { + t.Fatal(err) + } + <-testHookEvictionChan + close(waitChan) + }() + + _, err = db.ReserveSample(context.TODO(), []byte("anchor"), 5, uint64(time.Now().UnixNano())) + if !errors.Is(err, errSamplerStopped) { + t.Fatalf("expected sampler stopped error, found: %v", err) + } +} diff --git a/pkg/localstore/store_lock.go b/pkg/localstore/store_lock.go deleted file mode 100644 index ca5ff80976c..00000000000 --- a/pkg/localstore/store_lock.go +++ /dev/null @@ -1,41 +0,0 @@ -package localstore - -import "resenje.org/multex" - -const ( - Upload string = "upload" - Reserve = "reserve" - GC = "gc" - Write = "write" - Sampling = "sampling" -) - -type storeLock struct { - *multex.Multex -} - -func newStoreLock() storeLock { - return storeLock{ - Multex: multex.New(), - } -} - -func (s *storeLock) Lock(lk string) { - switch lk { - case Write: - s.Multex.Lock(Reserve) - s.Multex.Lock(GC) - default: - s.Multex.Lock(lk) - } -} - -func (s *storeLock) Unlock(lk string) { - switch lk { - case Write: - s.Multex.Unlock(Reserve) - s.Multex.Unlock(GC) - default: - s.Multex.Unlock(lk) - } -} From e5cfe882bf3f6e0cc51883b0d0fb3ebae18d4b98 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Tue, 6 Dec 2022 18:30:53 +0530 Subject: [PATCH 11/13] fix: revert batchstore change --- pkg/postage/batchstore/reserve.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pkg/postage/batchstore/reserve.go b/pkg/postage/batchstore/reserve.go index 146bb54e27d..8a23faeb408 100644 --- a/pkg/postage/batchstore/reserve.go +++ b/pkg/postage/batchstore/reserve.go @@ -105,7 +105,10 @@ func (s *store) cleanup() error { } for _, b := range evictions { - + err := s.evictFn(b.ID) + if err != nil { + return fmt.Errorf("evict batch %x: %w", b.ID, err) + } err = s.store.Delete(valueKey(b.Value, b.ID)) if err != nil { return fmt.Errorf("delete value key for batch %x: %w", b.ID, err) @@ -114,13 +117,6 @@ func (s *store) cleanup() error { if err != nil { return fmt.Errorf("delete batch %x: %w", b.ID, err) } - // first remove the batch from the batchstore as if a sampling process is - // started, these chunks will not be considered in the sample and they can - // be safely evicted in the background. - err := s.evictFn(b.ID) - if err != nil { - return fmt.Errorf("evict batch %x: %w", b.ID, err) - } if s.batchExpiry != nil { s.batchExpiry.HandleStampExpiry(b.ID) } From c0c943a4d0775d56cef967914ab9133ef071e421 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Tue, 6 Dec 2022 18:38:02 +0530 Subject: [PATCH 12/13] fix: reset sampler stop --- pkg/localstore/sampler.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/localstore/sampler.go b/pkg/localstore/sampler.go index a3dbd829a23..0e6a8da2259 100644 --- a/pkg/localstore/sampler.go +++ b/pkg/localstore/sampler.go @@ -89,7 +89,7 @@ func (db *DB) ReserveSample( // signal start of sampling to see if we get any evictions during the sampler // run db.startSampling() - defer db.stopSamplingIfRunning() + defer db.resetSamplingState() // Phase 1: Iterate chunk addresses g.Go(func() error { @@ -159,10 +159,6 @@ func (db *DB) ReserveSample( case <-db.close: return errDbClosed case <-db.samplerSignal: - db.lock.Lock(lockKeySampling) - db.samplerStop = nil - db.samplerSignal = nil - db.lock.Unlock(lockKeySampling) return errSamplerStopped } } @@ -291,3 +287,11 @@ func (db *DB) stopSamplingIfRunning() { db.samplerStop.Do(func() { close(db.samplerSignal) }) } } + +func (db *DB) resetSamplingState() { + db.lock.Lock(lockKeySampling) + defer db.lock.Unlock(lockKeySampling) + + db.samplerStop = nil + db.samplerSignal = nil +} From 50f22349b9969ae7a1123f25a097debc082707e8 Mon Sep 17 00:00:00 2001 From: Alok Nerurkar Date: Tue, 6 Dec 2022 18:44:55 +0530 Subject: [PATCH 13/13] fix: lint --- pkg/localstore/sampler_test.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/pkg/localstore/sampler_test.go b/pkg/localstore/sampler_test.go index 090433e6d89..927daa3c6dd 100644 --- a/pkg/localstore/sampler_test.go +++ b/pkg/localstore/sampler_test.go @@ -179,10 +179,7 @@ func TestReserveSamplerStop(t *testing.T) { go func() { <-startWait // this will trigger the eviction - _, err := db.ComputeReserveSize(0) - if err != nil { - t.Fatal(err) - } + _, _ = db.ComputeReserveSize(0) <-testHookEvictionChan close(waitChan) }()