Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: synchronize the reserve sampling process #3614

Merged
merged 13 commits into from Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -177,4 +177,5 @@ require (
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
resenje.org/multex v0.1.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -1722,6 +1722,8 @@ resenje.org/email v0.1.3/go.mod h1:OhAVLRG3vqd9NSgayN3pAgzxTmc2B6mAefgShZvEgf0=
resenje.org/jsonhttp v0.2.0/go.mod h1:EDyeguyTWj2fU3D3SCE0qNTgthzyEkHYLM1uu0uikHU=
resenje.org/logging v0.1.5/go.mod h1:1IdoCm3+UwYfsplxDGV2pHCkUrLlQzlWwp4r28XfPx4=
resenje.org/marshal v0.1.1/go.mod h1:P7Cla6Ju5CFvW4Y8JbRgWX1Hcy4L1w4qcCsyadO7G94=
resenje.org/multex v0.1.0 h1:am9Ndt8dIAeGVaztD8ClsSX+e0EP3mj6UdsvjukKZig=
resenje.org/multex v0.1.0/go.mod h1:3rHOoMrzqLNzgGWPcl/1GfzN52g7iaPXhbvTQ8TjGaM=
resenje.org/recovery v0.1.1/go.mod h1:3S6aCVKMJEWsSAb61oZTteaiqkIfQPTr1RdiWnRbhME=
resenje.org/singleflight v0.2.0 h1:nJ17VAZunMiFrfrltQ4Qs4r9MIP1pZC8u+0iSUTNnvQ=
resenje.org/singleflight v0.2.0/go.mod h1:plheHgw2rd77IH3J6aN0Lu2JvMvHXoLknDwb6vN0dsE=
Expand Down
23 changes: 14 additions & 9 deletions pkg/localstore/gc.go
Expand Up @@ -94,15 +94,15 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) {
target := db.gcTarget()

// tell the localstore to start logging dirty addresses
db.batchMu.Lock()
db.lock.Lock(GC)
db.gcRunning = true
db.batchMu.Unlock()
db.lock.Unlock(GC)

defer func() {
db.batchMu.Lock()
db.lock.Lock(GC)
db.gcRunning = false
db.dirtyAddresses = nil
db.batchMu.Unlock()
db.lock.Unlock(GC)
}()

gcSize, err := db.gcSize.Get()
Expand Down Expand Up @@ -142,9 +142,9 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) {
}

// protect database from changing idexes and gcSize
db.batchMu.Lock()
db.lock.Lock(GC)
defer totalTimeMetric(db.metrics.TotalTimeGCLock, time.Now())
defer db.batchMu.Unlock()
defer db.lock.Unlock(GC)

// refresh gcSize value, since it might have
// changed in the meanwhile
Expand Down Expand Up @@ -340,8 +340,9 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) {
totalTimeMetric(db.metrics.TotalTimeEvictReserve, start)
}(time.Now())

db.batchMu.Lock()
defer db.batchMu.Unlock()
// reserve eviction affects the reserve indexes as well as the GC indexes
db.lock.Lock(Write)
defer db.lock.Unlock(Write)

target = db.reserveCapacity

Expand All @@ -360,8 +361,12 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) {
// of triggering subsequent runs in case we're not done
totalCallbacks := 0
err = db.unreserveFunc(func(batchID []byte, radius uint8) (bool, error) {
// if we start evicting chunks from the reserve during sampling, we should
// fail the sampling process and not participate in the current round.
db.stopSamplingIfStarted()

totalCallbacks++
e, err := db.UnreserveBatch(batchID, radius)
e, err := db.unreserveBatch(batchID, radius)
if err != nil {
return true, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/localstore/gc_test.go
Expand Up @@ -972,7 +972,7 @@ func setTestHookGCIteratorDone(h func()) (reset func()) {
func unreserveChunkBatch(t *testing.T, db *DB, radius uint8, chs ...swarm.Chunk) {
t.Helper()
for _, ch := range chs {
_, err := db.UnreserveBatch(ch.Stamp().BatchID(), radius)
_, err := db.unreserveBatch(ch.Stamp().BatchID(), radius)
if err != nil {
t.Fatal(err)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/localstore/localstore.go
Expand Up @@ -154,7 +154,7 @@ type DB struct {
// baseKey is the overlay address
baseKey []byte

batchMu sync.Mutex
lock storeLock

// gcRunning is true while GC is running. it is
// used to avoid touching dirty gc index entries
Expand Down Expand Up @@ -184,12 +184,11 @@ type DB struct {
// underlaying leveldb to prevent possible panics from
// iterators
subscriptionsWG sync.WaitGroup

metrics metrics

logger log.Logger

validStamp postage.ValidStampFn
metrics metrics
logger log.Logger
validStamp postage.ValidStampFn
// sampler stop is used to synchronize the reserve eviction and sampling process
samplerStop chan struct{}
}

// Options struct holds optional parameters for configuring DB.
Expand Down Expand Up @@ -272,6 +271,7 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger
metrics: newMetrics(),
logger: logger.WithName(loggerName).Register(),
validStamp: o.ValidStamp,
lock: newStoreLock(),
}
if db.cacheCapacity == 0 {
db.cacheCapacity = defaultCacheCapacity
Expand Down
29 changes: 29 additions & 0 deletions pkg/localstore/metrics.go
Expand Up @@ -66,6 +66,11 @@ type metrics struct {
EvictReserveErrorCounter prometheus.Counter
EvictReserveCollectedCounter prometheus.Counter
TotalTimeEvictReserve prometheus.Counter

BatchEvictCounter prometheus.Counter
BatchEvictErrorCounter prometheus.Counter
BatchEvictCollectedCounter prometheus.Counter
TotalTimeBatchEvict prometheus.Counter
}

func newMetrics() metrics {
Expand Down Expand Up @@ -386,6 +391,30 @@ func newMetrics() metrics {
Name: "evict_reserve_total_time",
Help: "total time spent evicting from reserve",
}),
BatchEvictCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "batch_evict_count",
Help: "number of times the evict batch was invoked",
}),
BatchEvictErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "batch_evict_err_count",
Help: "number of times evict batch got an error",
}),
BatchEvictCollectedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "batch_evict_collected_count",
Help: "number of chunks that have been evicted for the batch expirations",
}),
TotalTimeBatchEvict: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "batch_evict_total_time",
Help: "total time spent evicting batches",
}),
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/localstore/mode_get.go
Expand Up @@ -129,8 +129,9 @@ func (db *DB) updateGCItems(items ...shed.Item) {
// only Address and Data fields with non zero values,
// which is ensured by the get function.
func (db *DB) updateGC(item shed.Item) (err error) {
db.batchMu.Lock()
defer db.batchMu.Unlock()
db.lock.Lock(GC)
defer db.lock.Unlock(GC)

if db.gcRunning {
db.dirtyAddresses = append(db.dirtyAddresses, swarm.NewAddress(item.Address))
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/localstore/mode_put.go
Expand Up @@ -68,13 +68,13 @@ func (r *releaseLocations) add(loc sharky.Location) {
// in multiple put method calls.
func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, retErr error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
db.lock.Lock(GC)
if db.gcRunning {
for _, ch := range chs {
db.dirtyAddresses = append(db.dirtyAddresses, ch.Address())
}
}
db.lock.Unlock(GC)

batch := new(leveldb.Batch)

Expand Down Expand Up @@ -163,6 +163,9 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)

switch mode {
case storage.ModePutRequest, storage.ModePutRequestPin, storage.ModePutRequestCache:
db.lock.Lock(Write)
defer db.lock.Unlock(Write)

for i, ch := range chs {
pin := mode == storage.ModePutRequestPin // force pin in this mode
cache := mode == storage.ModePutRequestCache // force cache
Expand All @@ -177,6 +180,9 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
}

case storage.ModePutUpload, storage.ModePutUploadPin:
db.lock.Lock(Upload)
defer db.lock.Unlock(Upload)

for i, ch := range chs {
pin := mode == storage.ModePutUploadPin
exists, c, err := putChunk(ch, i, func(item shed.Item, exists bool) (int64, error) {
Expand All @@ -189,13 +195,15 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
if !exists {
// chunk is new so, trigger subscription feeds
// after the batch is successfully written
triggerPullFeed[db.po(ch.Address())] = struct{}{}
triggerPushFeed = true
}
gcSizeChange += c
}

case storage.ModePutSync:
db.lock.Lock(Write)
defer db.lock.Unlock(Write)

for i, ch := range chs {
exists, c, err := putChunk(ch, i, func(item shed.Item, exists bool) (int64, error) {
return db.putSync(batch, binIDs, item, exists)
Expand Down
16 changes: 14 additions & 2 deletions pkg/localstore/mode_set.go
Expand Up @@ -49,11 +49,11 @@ func (db *DB) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
// chunks represented by provided addresses.
func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
db.lock.Lock(GC)
if db.gcRunning {
db.dirtyAddresses = append(db.dirtyAddresses, addrs...)
}
db.lock.Unlock(GC)

batch := new(leveldb.Batch)
var committedLocations []sharky.Location
Expand All @@ -68,6 +68,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
switch mode {

case storage.ModeSetSync:
db.lock.Lock(GC)
defer db.lock.Unlock(GC)

for _, addr := range addrs {
c, err := db.setSync(batch, addr)
if err != nil {
Expand All @@ -77,6 +80,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
}

case storage.ModeSetRemove:
db.lock.Lock(Write)
defer db.lock.Unlock(Write)

for _, addr := range addrs {
item := addressToItem(addr)
storedItem, err := db.retrievalDataIndex.Get(item)
Expand All @@ -96,6 +102,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
}

case storage.ModeSetPin:
db.lock.Lock(GC)
defer db.lock.Unlock(GC)

for _, addr := range addrs {
item := addressToItem(addr)
c, err := db.setPin(batch, item)
Expand All @@ -105,6 +114,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
gcSizeChange += c
}
case storage.ModeSetUnpin:
db.lock.Lock(GC)
defer db.lock.Unlock(GC)

for _, addr := range addrs {
c, err := db.setUnpin(batch, addr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/localstore/mode_set_test.go
Expand Up @@ -71,7 +71,7 @@ func TestModeSetRemove_WithSync(t *testing.T) {
var chs []swarm.Chunk
for i := 0; i < tc.count; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
_, err := db.UnreserveBatch(ch.Stamp().BatchID(), 2)
_, err := db.unreserveBatch(ch.Stamp().BatchID(), 2)
if err != nil {
t.Fatal(err)
}
Expand Down
26 changes: 25 additions & 1 deletion pkg/localstore/reserve.go
Expand Up @@ -8,16 +8,40 @@ import (
"encoding/hex"
"errors"
"fmt"
"time"

"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

// EvictBatch will evict all chunks associated with the batch from the reserve. This
// is used by batch store for expirations.
func (db *DB) EvictBatch(id []byte) error {
db.metrics.BatchEvictCounter.Inc()
defer func(start time.Time) {
totalTimeMetric(db.metrics.TotalTimeBatchEvict, start)
}(time.Now())

// EvictBatch will affect the reserve as well as GC indexes
db.lock.Lock(Write)
defer db.lock.Unlock(Write)

evicted, err := db.unreserveBatch(id, swarm.MaxBins)
if err != nil {
db.metrics.BatchEvictErrorCounter.Inc()
return fmt.Errorf("failed evict batch: %w", err)
}

db.metrics.BatchEvictCollectedCounter.Add(float64(evicted))
db.logger.Debug("evict batch", "batch_id", swarm.NewAddress(id), "evicted_count", evicted)
return nil
}

// UnreserveBatch atomically unpins chunks of a batch in proximity order upto and including po.
// Unpinning will result in all chunks with pincounter 0 to be put in the gc index
// so if a chunk was only pinned by the reserve, unreserving it will make it gc-able.
func (db *DB) UnreserveBatch(id []byte, radius uint8) (evicted uint64, err error) {
func (db *DB) unreserveBatch(id []byte, radius uint8) (evicted uint64, err error) {
var (
item = shed.Item{
BatchID: id,
Expand Down