diff --git a/go.mod b/go.mod index 1d1808c3938..2b441ca1845 100644 --- a/go.mod +++ b/go.mod @@ -177,4 +177,5 @@ require ( gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + resenje.org/multex v0.1.0 // indirect ) diff --git a/go.sum b/go.sum index 1b07e1a65c2..71640a1206b 100644 --- a/go.sum +++ b/go.sum @@ -1722,6 +1722,8 @@ resenje.org/email v0.1.3/go.mod h1:OhAVLRG3vqd9NSgayN3pAgzxTmc2B6mAefgShZvEgf0= resenje.org/jsonhttp v0.2.0/go.mod h1:EDyeguyTWj2fU3D3SCE0qNTgthzyEkHYLM1uu0uikHU= resenje.org/logging v0.1.5/go.mod h1:1IdoCm3+UwYfsplxDGV2pHCkUrLlQzlWwp4r28XfPx4= resenje.org/marshal v0.1.1/go.mod h1:P7Cla6Ju5CFvW4Y8JbRgWX1Hcy4L1w4qcCsyadO7G94= +resenje.org/multex v0.1.0 h1:am9Ndt8dIAeGVaztD8ClsSX+e0EP3mj6UdsvjukKZig= +resenje.org/multex v0.1.0/go.mod h1:3rHOoMrzqLNzgGWPcl/1GfzN52g7iaPXhbvTQ8TjGaM= resenje.org/recovery v0.1.1/go.mod h1:3S6aCVKMJEWsSAb61oZTteaiqkIfQPTr1RdiWnRbhME= resenje.org/singleflight v0.2.0 h1:nJ17VAZunMiFrfrltQ4Qs4r9MIP1pZC8u+0iSUTNnvQ= resenje.org/singleflight v0.2.0/go.mod h1:plheHgw2rd77IH3J6aN0Lu2JvMvHXoLknDwb6vN0dsE= diff --git a/pkg/localstore/gc.go b/pkg/localstore/gc.go index e3249464a58..a86d85c76a7 100644 --- a/pkg/localstore/gc.go +++ b/pkg/localstore/gc.go @@ -94,15 +94,15 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { target := db.gcTarget() // tell the localstore to start logging dirty addresses - db.batchMu.Lock() + db.lock.Lock(lockKeyGC) db.gcRunning = true - db.batchMu.Unlock() + db.lock.Unlock(lockKeyGC) defer func() { - db.batchMu.Lock() + db.lock.Lock(lockKeyGC) db.gcRunning = false db.dirtyAddresses = nil - db.batchMu.Unlock() + db.lock.Unlock(lockKeyGC) }() gcSize, err := db.gcSize.Get() @@ -142,9 +142,9 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) { } // protect database from changing idexes and gcSize - db.batchMu.Lock() + db.lock.Lock(lockKeyGC) defer totalTimeMetric(db.metrics.TotalTimeGCLock, time.Now()) - defer db.batchMu.Unlock() + defer db.lock.Unlock(lockKeyGC) // refresh gcSize value, since it might have // changed in the meanwhile @@ -340,8 +340,9 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) { totalTimeMetric(db.metrics.TotalTimeEvictReserve, start) }(time.Now()) - db.batchMu.Lock() - defer db.batchMu.Unlock() + // reserve eviction affects the reserve indexes as well as the GC indexes + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) target = db.reserveCapacity @@ -356,12 +357,17 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) { return 0, true, nil } + // if we start evicting chunks from the reserve during sampling, we should + // fail the sampling process and not participate in the current round. + db.stopSamplingIfRunning() + // if we dont get any entries at all then there's no use // of triggering subsequent runs in case we're not done totalCallbacks := 0 err = db.unreserveFunc(func(batchID []byte, radius uint8) (bool, error) { + totalCallbacks++ - e, err := db.UnreserveBatch(batchID, radius) + e, err := db.unreserveBatch(batchID, radius) if err != nil { return true, err } diff --git a/pkg/localstore/gc_test.go b/pkg/localstore/gc_test.go index 7ee476eed65..b7b9b2e6d5d 100644 --- a/pkg/localstore/gc_test.go +++ b/pkg/localstore/gc_test.go @@ -972,7 +972,7 @@ func setTestHookGCIteratorDone(h func()) (reset func()) { func unreserveChunkBatch(t *testing.T, db *DB, radius uint8, chs ...swarm.Chunk) { t.Helper() for _, ch := range chs { - _, err := db.UnreserveBatch(ch.Stamp().BatchID(), radius) + _, err := db.unreserveBatch(ch.Stamp().BatchID(), radius) if err != nil { t.Fatal(err) } diff --git a/pkg/localstore/localstore.go b/pkg/localstore/localstore.go index 0887e0d7fa8..b3dd29700e1 100644 --- a/pkg/localstore/localstore.go +++ b/pkg/localstore/localstore.go @@ -41,6 +41,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/spf13/afero" "github.com/syndtr/goleveldb/leveldb" + "resenje.org/multex" ) // loggerName is the tree path name of the logger for this package. @@ -72,6 +73,23 @@ const ( sharkyDirtyFileName = ".DIRTY" ) +const ( + // lockKeyUpload is used to guard against parallel updates during upload. These + // updates are made to mainly the pushIndex and doesnt involve the GC or Reserve + // indexes. Hence this lock is separated to allow GC/Reserve operations to continue + // along with uploads. + lockKeyUpload string = "upload" + // lockKeyGC is used to guard against parallel updates to the gcIndex and gcSize. + // The gcSize is a counter maintained for the gcIndex and hence parallel updates + // here need to be prevented. The reserve and GC locks are separated as the gcIndex + // and pullIndex are now mutually exclusive. So there are operations that could + // happen in parallel. This is slightly better than having a global lock. + lockKeyGC string = "gc" + // lockKeySampling is used to synchronize the sampler stopping process if evictions + // start during sampling. + lockKeySampling string = "sampling" +) + // DB is the local store implementation and holds // database related objects. type DB struct { @@ -154,7 +172,7 @@ type DB struct { // baseKey is the overlay address baseKey []byte - batchMu sync.Mutex + lock *multex.Multex // gcRunning is true while GC is running. it is // used to avoid touching dirty gc index entries @@ -184,12 +202,12 @@ type DB struct { // underlaying leveldb to prevent possible panics from // iterators subscriptionsWG sync.WaitGroup - - metrics metrics - - logger log.Logger - - validStamp postage.ValidStampFn + metrics metrics + logger log.Logger + validStamp postage.ValidStampFn + // following fields are used to synchronize sampling and reserve eviction + samplerStop *sync.Once + samplerSignal chan struct{} } // Options struct holds optional parameters for configuring DB. @@ -272,6 +290,7 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger metrics: newMetrics(), logger: logger.WithName(loggerName).Register(), validStamp: o.ValidStamp, + lock: multex.New(), } if db.cacheCapacity == 0 { db.cacheCapacity = defaultCacheCapacity diff --git a/pkg/localstore/metrics.go b/pkg/localstore/metrics.go index 12056c79062..bf709164bfc 100644 --- a/pkg/localstore/metrics.go +++ b/pkg/localstore/metrics.go @@ -66,6 +66,15 @@ type metrics struct { EvictReserveErrorCounter prometheus.Counter EvictReserveCollectedCounter prometheus.Counter TotalTimeEvictReserve prometheus.Counter + + BatchEvictCounter prometheus.Counter + BatchEvictErrorCounter prometheus.Counter + BatchEvictCollectedCounter prometheus.Counter + TotalTimeBatchEvict prometheus.Counter + + SamplerSuccessfulRuns prometheus.Counter + SamplerFailedRuns prometheus.Counter + SamplerStopped prometheus.Counter } func newMetrics() metrics { @@ -386,6 +395,48 @@ func newMetrics() metrics { Name: "evict_reserve_total_time", Help: "total time spent evicting from reserve", }), + BatchEvictCounter: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "batch_evict_count", + Help: "number of times the evict batch was invoked", + }), + BatchEvictErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "batch_evict_err_count", + Help: "number of times evict batch got an error", + }), + BatchEvictCollectedCounter: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "batch_evict_collected_count", + Help: "number of chunks that have been evicted for the batch expirations", + }), + TotalTimeBatchEvict: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "batch_evict_total_time", + Help: "total time spent evicting batches", + }), + SamplerSuccessfulRuns: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "sampler_successful_runs_count", + Help: "number of times the sampler ran successfully", + }), + SamplerFailedRuns: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "sampler_failed_runs_count", + Help: "number of times sampler failed", + }), + SamplerStopped: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "sampler_stopped_count", + Help: "number of times sampler was stopped due to evictions", + }), } } diff --git a/pkg/localstore/mode_get.go b/pkg/localstore/mode_get.go index 69b23481631..99e02042348 100644 --- a/pkg/localstore/mode_get.go +++ b/pkg/localstore/mode_get.go @@ -129,8 +129,9 @@ func (db *DB) updateGCItems(items ...shed.Item) { // only Address and Data fields with non zero values, // which is ensured by the get function. func (db *DB) updateGC(item shed.Item) (err error) { - db.batchMu.Lock() - defer db.batchMu.Unlock() + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + if db.gcRunning { db.dirtyAddresses = append(db.dirtyAddresses, swarm.NewAddress(item.Address)) } diff --git a/pkg/localstore/mode_put.go b/pkg/localstore/mode_put.go index 14334c6d474..78833061fee 100644 --- a/pkg/localstore/mode_put.go +++ b/pkg/localstore/mode_put.go @@ -68,13 +68,13 @@ func (r *releaseLocations) add(loc sharky.Location) { // in multiple put method calls. func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, retErr error) { // protect parallel updates - db.batchMu.Lock() - defer db.batchMu.Unlock() + db.lock.Lock(lockKeyGC) if db.gcRunning { for _, ch := range chs { db.dirtyAddresses = append(db.dirtyAddresses, ch.Address()) } } + db.lock.Unlock(lockKeyGC) batch := new(leveldb.Batch) @@ -176,6 +176,9 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) gcSizeChange += c } + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + case storage.ModePutUpload, storage.ModePutUploadPin: for i, ch := range chs { pin := mode == storage.ModePutUploadPin @@ -189,12 +192,14 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) if !exists { // chunk is new so, trigger subscription feeds // after the batch is successfully written - triggerPullFeed[db.po(ch.Address())] = struct{}{} triggerPushFeed = true } gcSizeChange += c } + db.lock.Lock(lockKeyUpload) + defer db.lock.Unlock(lockKeyUpload) + case storage.ModePutSync: for i, ch := range chs { exists, c, err := putChunk(ch, i, func(item shed.Item, exists bool) (int64, error) { @@ -212,6 +217,9 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) gcSizeChange += c } + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + default: return nil, ErrInvalidMode } diff --git a/pkg/localstore/mode_set.go b/pkg/localstore/mode_set.go index 0a69ac13dfd..19c4074a0ab 100644 --- a/pkg/localstore/mode_set.go +++ b/pkg/localstore/mode_set.go @@ -49,11 +49,11 @@ func (db *DB) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr // chunks represented by provided addresses. func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) { // protect parallel updates - db.batchMu.Lock() - defer db.batchMu.Unlock() + db.lock.Lock(lockKeyGC) if db.gcRunning { db.dirtyAddresses = append(db.dirtyAddresses, addrs...) } + db.lock.Unlock(lockKeyGC) batch := new(leveldb.Batch) var committedLocations []sharky.Location @@ -76,6 +76,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr gcSizeChange += c } + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + case storage.ModeSetRemove: for _, addr := range addrs { item := addressToItem(addr) @@ -95,6 +98,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr gcSizeChange += c } + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + case storage.ModeSetPin: for _, addr := range addrs { item := addressToItem(addr) @@ -104,6 +110,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr } gcSizeChange += c } + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + case storage.ModeSetUnpin: for _, addr := range addrs { c, err := db.setUnpin(batch, addr) @@ -112,6 +121,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr } gcSizeChange += c } + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + default: return ErrInvalidMode } @@ -399,6 +411,5 @@ func (db *DB) setUnpin(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange i return 0, err } - gcSizeChange++ - return gcSizeChange, nil + return 1, nil } diff --git a/pkg/localstore/mode_set_test.go b/pkg/localstore/mode_set_test.go index d01c543ee07..c0e25fae0d4 100644 --- a/pkg/localstore/mode_set_test.go +++ b/pkg/localstore/mode_set_test.go @@ -71,7 +71,7 @@ func TestModeSetRemove_WithSync(t *testing.T) { var chs []swarm.Chunk for i := 0; i < tc.count; i++ { ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false) - _, err := db.UnreserveBatch(ch.Stamp().BatchID(), 2) + _, err := db.unreserveBatch(ch.Stamp().BatchID(), 2) if err != nil { t.Fatal(err) } diff --git a/pkg/localstore/reserve.go b/pkg/localstore/reserve.go index 2eaee6537df..8d174759200 100644 --- a/pkg/localstore/reserve.go +++ b/pkg/localstore/reserve.go @@ -8,16 +8,42 @@ import ( "encoding/hex" "errors" "fmt" + "time" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/swarm" "github.com/syndtr/goleveldb/leveldb" ) +// EvictBatch will evict all chunks associated with the batch from the reserve. This +// is used by batch store for expirations. +func (db *DB) EvictBatch(id []byte) error { + db.metrics.BatchEvictCounter.Inc() + defer func(start time.Time) { + totalTimeMetric(db.metrics.TotalTimeBatchEvict, start) + }(time.Now()) + + // EvictBatch will affect the reserve as well as GC indexes + db.lock.Lock(lockKeyGC) + defer db.lock.Unlock(lockKeyGC) + + db.stopSamplingIfRunning() + + evicted, err := db.unreserveBatch(id, swarm.MaxBins) + if err != nil { + db.metrics.BatchEvictErrorCounter.Inc() + return fmt.Errorf("failed evict batch: %w", err) + } + + db.metrics.BatchEvictCollectedCounter.Add(float64(evicted)) + db.logger.Debug("evict batch", "batch_id", swarm.NewAddress(id), "evicted_count", evicted) + return nil +} + // UnreserveBatch atomically unpins chunks of a batch in proximity order upto and including po. // Unpinning will result in all chunks with pincounter 0 to be put in the gc index // so if a chunk was only pinned by the reserve, unreserving it will make it gc-able. -func (db *DB) UnreserveBatch(id []byte, radius uint8) (evicted uint64, err error) { +func (db *DB) unreserveBatch(id []byte, radius uint8) (evicted uint64, err error) { var ( item = shed.Item{ BatchID: id, diff --git a/pkg/localstore/reserve_test.go b/pkg/localstore/reserve_test.go index a5c6bc955e2..9276b2c7557 100644 --- a/pkg/localstore/reserve_test.go +++ b/pkg/localstore/reserve_test.go @@ -769,3 +769,79 @@ func TestDB_ReserveGC_BatchedUnreserve(t *testing.T) { t.Run("gc size", newIndexGCSizeTest(db)) } + +func TestDB_ReserveGC_EvictBatch(t *testing.T) { + chunkCount := 100 + + var closed chan struct{} + testHookCollectGarbageChan := make(chan uint64) + t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) { + if collectedCount == 0 { + return + } + select { + case testHookCollectGarbageChan <- collectedCount: + case <-closed: + } + })) + + stamp := postagetesting.MustNewStamp() + + db := newTestDB(t, &Options{ + Capacity: 100, + ReserveCapacity: 100, + }) + closed = db.close + + // generate chunks with the same batch and depth to trigger larger eviction + genChunk := func() swarm.Chunk { + newStamp := postagetesting.MustNewBatchStamp(stamp.BatchID()) + ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2) + return ch.WithBatch(2, 3, 2, false).WithStamp(newStamp) + } + + for i := 0; i < chunkCount; i++ { + ch := genChunk() + _, err := db.Put(context.Background(), storage.ModePutSync, ch) + if err != nil { + t.Fatal(err) + } + } + + t.Run("reserve size", reserveSizeTest(db, 100, 0)) + + err := db.EvictBatch(stamp.BatchID()) + if err != nil { + t.Fatal(err) + } + + t.Run("reserve size", reserveSizeTest(db, 0, 0)) + + gcTarget := db.gcTarget() + + for { + select { + case <-testHookCollectGarbageChan: + case <-time.After(10 * time.Second): + t.Fatal("gc timeout") + } + + gcSize, err := db.gcSize.Get() + if err != nil { + t.Fatal(err) + } + if gcSize == gcTarget { + break + } + } + + t.Run("postage chunks index count", newItemsCountTest(db.postageChunksIndex, 90)) + + // batch is evicted + t.Run("postage radius index count", newItemsCountTest(db.postageRadiusIndex, 0)) + + // all chunks would land into the gcIndex + t.Run("gc index count", newItemsCountTest(db.gcIndex, 90)) + + t.Run("gc size", newIndexGCSizeTest(db)) +} diff --git a/pkg/localstore/sampler.go b/pkg/localstore/sampler.go index d10164c36a8..0e6a8da2259 100644 --- a/pkg/localstore/sampler.go +++ b/pkg/localstore/sampler.go @@ -11,6 +11,7 @@ import ( "encoding/binary" "errors" "fmt" + "sync" "time" "github.com/ethersphere/bee/pkg/bmtpool" @@ -27,6 +28,7 @@ import ( const sampleSize = 8 var errDbClosed = errors.New("database closed") +var errSamplerStopped = errors.New("sampler stopped due to ongoing evictions") type sampleStat struct { TotalIterated atomic.Int64 @@ -48,7 +50,8 @@ func (s sampleStat) String() string { seconds := int64(time.Second) return fmt.Sprintf( - "Chunks: %d NotFound: %d New Ignored: %d Iteration Duration: %d secs GetDuration: %d secs HmacrDuration: %d ValidStampDuration: %d", + "Chunks: %d NotFound: %d New Ignored: %d Iteration Duration: %d secs GetDuration: %d secs"+ + " HmacrDuration: %d secs ValidStampDuration: %d secs", s.TotalIterated.Load(), s.NotFound.Load(), s.NewIgnored.Load(), @@ -83,11 +86,16 @@ func (db *DB) ReserveSample( logger := db.logger.WithName("sampler").V(1).Register() t := time.Now() + // signal start of sampling to see if we get any evictions during the sampler + // run + db.startSampling() + defer db.resetSamplingState() // Phase 1: Iterate chunk addresses g.Go(func() error { defer close(addrChan) iterationStart := time.Now() + err := db.pullIndex.Iterate(func(item shed.Item) (bool, error) { select { case addrChan <- swarm.NewAddress(item.Address): @@ -150,6 +158,8 @@ func (db *DB) ReserveSample( return ctx.Err() case <-db.close: return errDbClosed + case <-db.samplerSignal: + return errSamplerStopped } } @@ -198,7 +208,12 @@ func (db *DB) ReserveSample( chunk := swarm.NewChunk(swarm.NewAddress(item.chunkItem.Address), item.chunkItem.Data) - stamp := postage.NewStamp(item.chunkItem.BatchID, item.chunkItem.Index, item.chunkItem.Timestamp, item.chunkItem.Sig) + stamp := postage.NewStamp( + item.chunkItem.BatchID, + item.chunkItem.Index, + item.chunkItem.Timestamp, + item.chunkItem.Sig, + ) stampData, err := stamp.MarshalBinary() if err != nil { @@ -217,11 +232,14 @@ func (db *DB) ReserveSample( } stat.ValidStampDuration.Add(time.Since(validStart).Nanoseconds()) - } } if err := g.Wait(); err != nil { + db.metrics.SamplerFailedRuns.Inc() + if errors.Is(err, errSamplerStopped) { + db.metrics.SamplerStopped.Inc() + } return storage.Sample{}, fmt.Errorf("sampler: failed creating sample: %w", err) } @@ -231,6 +249,7 @@ func (db *DB) ReserveSample( for _, s := range sampleItems { _, err := hasher.Write(s.Bytes()) if err != nil { + db.metrics.SamplerFailedRuns.Inc() return storage.Sample{}, fmt.Errorf("sampler: failed creating root hash of sample: %w", err) } } @@ -241,6 +260,7 @@ func (db *DB) ReserveSample( Hash: swarm.NewAddress(hash), } + db.metrics.SamplerSuccessfulRuns.Inc() logger.Info("sampler done", "duration", time.Since(t), "storage_radius", storageRadius, "consensus_time_ns", consensusTime, "stats", stat, "sample", sample) return sample, nil @@ -250,3 +270,28 @@ func (db *DB) ReserveSample( func le(a, b []byte) bool { return bytes.Compare(a, b) == -1 } + +func (db *DB) startSampling() { + db.lock.Lock(lockKeySampling) + defer db.lock.Unlock(lockKeySampling) + + db.samplerStop = new(sync.Once) + db.samplerSignal = make(chan struct{}) +} + +func (db *DB) stopSamplingIfRunning() { + db.lock.Lock(lockKeySampling) + defer db.lock.Unlock(lockKeySampling) + + if db.samplerStop != nil { + db.samplerStop.Do(func() { close(db.samplerSignal) }) + } +} + +func (db *DB) resetSamplingState() { + db.lock.Lock(lockKeySampling) + defer db.lock.Unlock(lockKeySampling) + + db.samplerStop = nil + db.samplerSignal = nil +} diff --git a/pkg/localstore/sampler_test.go b/pkg/localstore/sampler_test.go index 14fe07a0755..927daa3c6dd 100644 --- a/pkg/localstore/sampler_test.go +++ b/pkg/localstore/sampler_test.go @@ -7,9 +7,12 @@ package localstore import ( "bytes" "context" + "errors" + "sync" "testing" "time" + "github.com/ethersphere/bee/pkg/postage" postagetesting "github.com/ethersphere/bee/pkg/postage/testing" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -95,3 +98,94 @@ func TestReserveSampler(t *testing.T) { } }) } + +func TestReserveSamplerStop(t *testing.T) { + const chunkCountPerPO = 10 + const maxPO = 10 + var ( + chs []swarm.Chunk + batchIDs [][]byte + closed chan struct{} + doneMtx sync.Mutex + mtx sync.Mutex + ) + startWait, waitChan := make(chan struct{}), make(chan struct{}) + doneWaiting := false + + testHookEvictionChan := make(chan uint64) + t.Cleanup(setTestHookEviction(func(count uint64) { + if count == 0 { + return + } + select { + case testHookEvictionChan <- count: + case <-closed: + } + })) + + db := newTestDB(t, &Options{ + ReserveCapacity: 90, + UnreserveFunc: func(f postage.UnreserveIteratorFn) error { + mtx.Lock() + defer mtx.Unlock() + for i := 0; i < len(batchIDs); i++ { + // pop an element from batchIDs, call the Unreserve + item := batchIDs[i] + // here we mock the behavior of the batchstore + // that would call the localstore back with the + // batch IDs and the radiuses from the FIFO queue + stop, err := f(item, 2) + if err != nil { + return err + } + if stop { + return nil + } + } + batchIDs = nil + return nil + }, + ValidStamp: func(_ swarm.Chunk, stampBytes []byte) (chunk swarm.Chunk, err error) { + doneMtx.Lock() + defer doneMtx.Unlock() + + if !doneWaiting { + // signal that we have started sampling + close(startWait) + // this makes sampling wait till we trigger eviction for the test + <-waitChan + } + doneWaiting = true + return nil, nil + }, + }) + closed = db.close + + for po := 0; po < maxPO; po++ { + for i := 0; i < chunkCountPerPO; i++ { + ch := generateValidRandomChunkAt(swarm.NewAddress(db.baseKey), po).WithBatch(2, 3, 2, false) + mtx.Lock() + chs = append(chs, ch) + batchIDs = append(batchIDs, ch.Stamp().BatchID()) + mtx.Unlock() + } + } + + _, err := db.Put(context.Background(), storage.ModePutSync, chs...) + if err != nil { + t.Fatal(err) + } + + go func() { + <-startWait + // this will trigger the eviction + _, _ = db.ComputeReserveSize(0) + <-testHookEvictionChan + close(waitChan) + }() + + _, err = db.ReserveSample(context.TODO(), []byte("anchor"), 5, uint64(time.Now().UnixNano())) + if !errors.Is(err, errSamplerStopped) { + t.Fatalf("expected sampler stopped error, found: %v", err) + } +} diff --git a/pkg/node/node.go b/pkg/node/node.go index 2f40de890e3..abf1499a37e 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -274,14 +274,16 @@ func NewBee(interrupt chan struct{}, sysInterrupt chan os.Signal, addr string, p logger.Info("using network id", "network_id", networkID) var batchStore postage.Storer = new(postage.NoOpBatchStore) - var unreserveFn func([]byte, uint8) (uint64, error) + var evictFn func([]byte) error if chainEnabled { - var evictFn = func(b []byte) error { - _, err := unreserveFn(b, swarm.MaxBins) - return err - } - batchStore, err = batchstore.New(stateStore, evictFn, logger) + batchStore, err = batchstore.New( + stateStore, + func(id []byte) error { + return evictFn(id) + }, + logger, + ) if err != nil { return nil, fmt.Errorf("batchstore: %w", err) } @@ -665,7 +667,7 @@ func NewBee(interrupt chan struct{}, sysInterrupt chan os.Signal, addr string, p return nil, fmt.Errorf("localstore: %w", err) } b.localstoreCloser = storer - unreserveFn = storer.UnreserveBatch + evictFn = storer.EvictBatch post, err := postage.NewService(stateStore, batchStore, chainID) if err != nil { diff --git a/pkg/postage/batchstore/reserve.go b/pkg/postage/batchstore/reserve.go index 54e1ea811c0..8a23faeb408 100644 --- a/pkg/postage/batchstore/reserve.go +++ b/pkg/postage/batchstore/reserve.go @@ -105,7 +105,6 @@ func (s *store) cleanup() error { } for _, b := range evictions { - err := s.evictFn(b.ID) if err != nil { return fmt.Errorf("evict batch %x: %w", b.ID, err)