Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: synchronize the reserve sampling process #3614

Merged
merged 13 commits into from Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -177,4 +177,5 @@ require (
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
resenje.org/multex v0.1.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -1722,6 +1722,8 @@ resenje.org/email v0.1.3/go.mod h1:OhAVLRG3vqd9NSgayN3pAgzxTmc2B6mAefgShZvEgf0=
resenje.org/jsonhttp v0.2.0/go.mod h1:EDyeguyTWj2fU3D3SCE0qNTgthzyEkHYLM1uu0uikHU=
resenje.org/logging v0.1.5/go.mod h1:1IdoCm3+UwYfsplxDGV2pHCkUrLlQzlWwp4r28XfPx4=
resenje.org/marshal v0.1.1/go.mod h1:P7Cla6Ju5CFvW4Y8JbRgWX1Hcy4L1w4qcCsyadO7G94=
resenje.org/multex v0.1.0 h1:am9Ndt8dIAeGVaztD8ClsSX+e0EP3mj6UdsvjukKZig=
resenje.org/multex v0.1.0/go.mod h1:3rHOoMrzqLNzgGWPcl/1GfzN52g7iaPXhbvTQ8TjGaM=
resenje.org/recovery v0.1.1/go.mod h1:3S6aCVKMJEWsSAb61oZTteaiqkIfQPTr1RdiWnRbhME=
resenje.org/singleflight v0.2.0 h1:nJ17VAZunMiFrfrltQ4Qs4r9MIP1pZC8u+0iSUTNnvQ=
resenje.org/singleflight v0.2.0/go.mod h1:plheHgw2rd77IH3J6aN0Lu2JvMvHXoLknDwb6vN0dsE=
Expand Down
24 changes: 15 additions & 9 deletions pkg/localstore/gc.go
Expand Up @@ -94,15 +94,15 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) {
target := db.gcTarget()

// tell the localstore to start logging dirty addresses
db.batchMu.Lock()
db.lock.Lock(lockKeyGC)
db.gcRunning = true
db.batchMu.Unlock()
db.lock.Unlock(lockKeyGC)

defer func() {
db.batchMu.Lock()
db.lock.Lock(lockKeyGC)
db.gcRunning = false
db.dirtyAddresses = nil
db.batchMu.Unlock()
db.lock.Unlock(lockKeyGC)
}()

gcSize, err := db.gcSize.Get()
Expand Down Expand Up @@ -142,9 +142,9 @@ func (db *DB) collectGarbage() (evicted uint64, done bool, err error) {
}

// protect database from changing idexes and gcSize
db.batchMu.Lock()
db.lock.Lock(lockKeyGC)
defer totalTimeMetric(db.metrics.TotalTimeGCLock, time.Now())
defer db.batchMu.Unlock()
defer db.lock.Unlock(lockKeyGC)

// refresh gcSize value, since it might have
// changed in the meanwhile
Expand Down Expand Up @@ -340,8 +340,9 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) {
totalTimeMetric(db.metrics.TotalTimeEvictReserve, start)
}(time.Now())

db.batchMu.Lock()
defer db.batchMu.Unlock()
// reserve eviction affects the reserve indexes as well as the GC indexes
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

target = db.reserveCapacity

Expand All @@ -356,12 +357,17 @@ func (db *DB) evictReserve() (totalEvicted uint64, done bool, err error) {
return 0, true, nil
}

// if we start evicting chunks from the reserve during sampling, we should
// fail the sampling process and not participate in the current round.
db.stopSamplingIfRunning()

// if we dont get any entries at all then there's no use
// of triggering subsequent runs in case we're not done
totalCallbacks := 0
err = db.unreserveFunc(func(batchID []byte, radius uint8) (bool, error) {

totalCallbacks++
e, err := db.UnreserveBatch(batchID, radius)
e, err := db.unreserveBatch(batchID, radius)
if err != nil {
return true, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/localstore/gc_test.go
Expand Up @@ -972,7 +972,7 @@ func setTestHookGCIteratorDone(h func()) (reset func()) {
func unreserveChunkBatch(t *testing.T, db *DB, radius uint8, chs ...swarm.Chunk) {
t.Helper()
for _, ch := range chs {
_, err := db.UnreserveBatch(ch.Stamp().BatchID(), radius)
_, err := db.unreserveBatch(ch.Stamp().BatchID(), radius)
if err != nil {
t.Fatal(err)
}
Expand Down
33 changes: 26 additions & 7 deletions pkg/localstore/localstore.go
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/afero"
"github.com/syndtr/goleveldb/leveldb"
"resenje.org/multex"
)

// loggerName is the tree path name of the logger for this package.
Expand Down Expand Up @@ -72,6 +73,23 @@ const (
sharkyDirtyFileName = ".DIRTY"
)

const (
// lockKeyUpload is used to guard against parallel updates during upload. These
// updates are made to mainly the pushIndex and doesnt involve the GC or Reserve
// indexes. Hence this lock is separated to allow GC/Reserve operations to continue
// along with uploads.
lockKeyUpload string = "upload"
// lockKeyGC is used to guard against parallel updates to the gcIndex and gcSize.
// The gcSize is a counter maintained for the gcIndex and hence parallel updates
// here need to be prevented. The reserve and GC locks are separated as the gcIndex
// and pullIndex are now mutually exclusive. So there are operations that could
// happen in parallel. This is slightly better than having a global lock.
lockKeyGC string = "gc"
// lockKeySampling is used to synchronize the sampler stopping process if evictions
// start during sampling.
lockKeySampling string = "sampling"
)

// DB is the local store implementation and holds
// database related objects.
type DB struct {
Expand Down Expand Up @@ -154,7 +172,7 @@ type DB struct {
// baseKey is the overlay address
baseKey []byte

batchMu sync.Mutex
lock *multex.Multex

// gcRunning is true while GC is running. it is
// used to avoid touching dirty gc index entries
Expand Down Expand Up @@ -184,12 +202,12 @@ type DB struct {
// underlaying leveldb to prevent possible panics from
// iterators
subscriptionsWG sync.WaitGroup

metrics metrics

logger log.Logger

validStamp postage.ValidStampFn
metrics metrics
logger log.Logger
validStamp postage.ValidStampFn
// following fields are used to synchronize sampling and reserve eviction
samplerStop *sync.Once
samplerSignal chan struct{}
}

// Options struct holds optional parameters for configuring DB.
Expand Down Expand Up @@ -272,6 +290,7 @@ func New(path string, baseKey []byte, ss storage.StateStorer, o *Options, logger
metrics: newMetrics(),
logger: logger.WithName(loggerName).Register(),
validStamp: o.ValidStamp,
lock: multex.New(),
}
if db.cacheCapacity == 0 {
db.cacheCapacity = defaultCacheCapacity
Expand Down
51 changes: 51 additions & 0 deletions pkg/localstore/metrics.go
Expand Up @@ -66,6 +66,15 @@ type metrics struct {
EvictReserveErrorCounter prometheus.Counter
EvictReserveCollectedCounter prometheus.Counter
TotalTimeEvictReserve prometheus.Counter

BatchEvictCounter prometheus.Counter
BatchEvictErrorCounter prometheus.Counter
BatchEvictCollectedCounter prometheus.Counter
TotalTimeBatchEvict prometheus.Counter

SamplerSuccessfulRuns prometheus.Counter
SamplerFailedRuns prometheus.Counter
SamplerStopped prometheus.Counter
}

func newMetrics() metrics {
Expand Down Expand Up @@ -386,6 +395,48 @@ func newMetrics() metrics {
Name: "evict_reserve_total_time",
Help: "total time spent evicting from reserve",
}),
BatchEvictCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "batch_evict_count",
Help: "number of times the evict batch was invoked",
}),
BatchEvictErrorCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "batch_evict_err_count",
Help: "number of times evict batch got an error",
}),
BatchEvictCollectedCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "batch_evict_collected_count",
Help: "number of chunks that have been evicted for the batch expirations",
}),
TotalTimeBatchEvict: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "batch_evict_total_time",
Help: "total time spent evicting batches",
}),
SamplerSuccessfulRuns: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "sampler_successful_runs_count",
Help: "number of times the sampler ran successfully",
}),
SamplerFailedRuns: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "sampler_failed_runs_count",
Help: "number of times sampler failed",
}),
SamplerStopped: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "sampler_stopped_count",
Help: "number of times sampler was stopped due to evictions",
}),
}
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/localstore/mode_get.go
Expand Up @@ -129,8 +129,9 @@ func (db *DB) updateGCItems(items ...shed.Item) {
// only Address and Data fields with non zero values,
// which is ensured by the get function.
func (db *DB) updateGC(item shed.Item) (err error) {
db.batchMu.Lock()
defer db.batchMu.Unlock()
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

if db.gcRunning {
db.dirtyAddresses = append(db.dirtyAddresses, swarm.NewAddress(item.Address))
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/localstore/mode_put.go
Expand Up @@ -68,13 +68,13 @@ func (r *releaseLocations) add(loc sharky.Location) {
// in multiple put method calls.
func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) (exist []bool, retErr error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
db.lock.Lock(lockKeyGC)
if db.gcRunning {
for _, ch := range chs {
db.dirtyAddresses = append(db.dirtyAddresses, ch.Address())
}
}
db.lock.Unlock(lockKeyGC)

batch := new(leveldb.Batch)

Expand Down Expand Up @@ -176,6 +176,9 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
gcSizeChange += c
}

db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

case storage.ModePutUpload, storage.ModePutUploadPin:
for i, ch := range chs {
pin := mode == storage.ModePutUploadPin
Expand All @@ -189,12 +192,14 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
if !exists {
// chunk is new so, trigger subscription feeds
// after the batch is successfully written
triggerPullFeed[db.po(ch.Address())] = struct{}{}
triggerPushFeed = true
}
gcSizeChange += c
}

db.lock.Lock(lockKeyUpload)
defer db.lock.Unlock(lockKeyUpload)

case storage.ModePutSync:
for i, ch := range chs {
exists, c, err := putChunk(ch, i, func(item shed.Item, exists bool) (int64, error) {
Expand All @@ -212,6 +217,9 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk)
gcSizeChange += c
}

db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

default:
return nil, ErrInvalidMode
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/localstore/mode_set.go
Expand Up @@ -49,11 +49,11 @@ func (db *DB) Set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
// chunks represented by provided addresses.
func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Address) (err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
db.lock.Lock(lockKeyGC)
if db.gcRunning {
db.dirtyAddresses = append(db.dirtyAddresses, addrs...)
}
db.lock.Unlock(lockKeyGC)

batch := new(leveldb.Batch)
var committedLocations []sharky.Location
Expand All @@ -76,6 +76,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
gcSizeChange += c
}

db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

case storage.ModeSetRemove:
for _, addr := range addrs {
item := addressToItem(addr)
Expand All @@ -95,6 +98,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
gcSizeChange += c
}

db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

case storage.ModeSetPin:
for _, addr := range addrs {
item := addressToItem(addr)
Expand All @@ -104,6 +110,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
}
gcSizeChange += c
}
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

case storage.ModeSetUnpin:
for _, addr := range addrs {
c, err := db.setUnpin(batch, addr)
Expand All @@ -112,6 +121,9 @@ func (db *DB) set(ctx context.Context, mode storage.ModeSet, addrs ...swarm.Addr
}
gcSizeChange += c
}
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

default:
return ErrInvalidMode
}
Expand Down Expand Up @@ -399,6 +411,5 @@ func (db *DB) setUnpin(batch *leveldb.Batch, addr swarm.Address) (gcSizeChange i
return 0, err
}

gcSizeChange++
return gcSizeChange, nil
return 1, nil
}
2 changes: 1 addition & 1 deletion pkg/localstore/mode_set_test.go
Expand Up @@ -71,7 +71,7 @@ func TestModeSetRemove_WithSync(t *testing.T) {
var chs []swarm.Chunk
for i := 0; i < tc.count; i++ {
ch := generateTestRandomChunkAt(swarm.NewAddress(db.baseKey), 2).WithBatch(2, 3, 2, false)
_, err := db.UnreserveBatch(ch.Stamp().BatchID(), 2)
_, err := db.unreserveBatch(ch.Stamp().BatchID(), 2)
if err != nil {
t.Fatal(err)
}
Expand Down
28 changes: 27 additions & 1 deletion pkg/localstore/reserve.go
Expand Up @@ -8,16 +8,42 @@ import (
"encoding/hex"
"errors"
"fmt"
"time"

"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

// EvictBatch will evict all chunks associated with the batch from the reserve. This
// is used by batch store for expirations.
func (db *DB) EvictBatch(id []byte) error {
db.metrics.BatchEvictCounter.Inc()
defer func(start time.Time) {
totalTimeMetric(db.metrics.TotalTimeBatchEvict, start)
}(time.Now())

// EvictBatch will affect the reserve as well as GC indexes
db.lock.Lock(lockKeyGC)
defer db.lock.Unlock(lockKeyGC)

db.stopSamplingIfRunning()

evicted, err := db.unreserveBatch(id, swarm.MaxBins)
if err != nil {
db.metrics.BatchEvictErrorCounter.Inc()
return fmt.Errorf("failed evict batch: %w", err)
}

db.metrics.BatchEvictCollectedCounter.Add(float64(evicted))
db.logger.Debug("evict batch", "batch_id", swarm.NewAddress(id), "evicted_count", evicted)
return nil
}

// UnreserveBatch atomically unpins chunks of a batch in proximity order upto and including po.
// Unpinning will result in all chunks with pincounter 0 to be put in the gc index
// so if a chunk was only pinned by the reserve, unreserving it will make it gc-able.
func (db *DB) UnreserveBatch(id []byte, radius uint8) (evicted uint64, err error) {
func (db *DB) unreserveBatch(id []byte, radius uint8) (evicted uint64, err error) {
var (
item = shed.Item{
BatchID: id,
Expand Down