Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: puller rewrite and bug fixes #3437

Merged
merged 24 commits into from Oct 25, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 0 additions & 9 deletions pkg/flipflop/doc.go

This file was deleted.

68 changes: 0 additions & 68 deletions pkg/flipflop/falling_edge.go

This file was deleted.

122 changes: 0 additions & 122 deletions pkg/flipflop/falling_edge_test.go

This file was deleted.

5 changes: 0 additions & 5 deletions pkg/localstore/localstore.go
Expand Up @@ -60,11 +60,6 @@ var (
// Limit the number of goroutines created by Getters
// that call updateGC function. Value 0 sets no limit.
maxParallelUpdateGC = 1000

// values needed to adjust subscription trigger
// buffer time.
flipFlopBufferDuration = 150 * time.Millisecond
flipFlopWorstCaseDuration = 10 * time.Second
)

const (
Expand Down
16 changes: 6 additions & 10 deletions pkg/localstore/subscription_pull.go
Expand Up @@ -22,7 +22,6 @@ import (
"sync"
"time"

"github.com/ethersphere/bee/pkg/flipflop"
"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
Expand All @@ -44,18 +43,16 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)

chunkDescriptors := make(chan storage.Descriptor)

in, out, clean := flipflop.NewFallingEdge(flipFlopBufferDuration, flipFlopWorstCaseDuration)
trigger := make(chan struct{}, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These triggers would be fired on put operations. As it is buffered the Put operation would block on the pullIndex iteration completion everytime. How would this work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see line 208, subscription_pull.go.
It is a select with a default, so no blocks

trigger <- struct{}{}

db.pullTriggersMu.Lock()
if _, ok := db.pullTriggers[bin]; !ok {
db.pullTriggers[bin] = make([]chan<- struct{}, 0)
}
db.pullTriggers[bin] = append(db.pullTriggers[bin], in)
db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger)
db.pullTriggersMu.Unlock()

// send signal for the initial iteration
in <- struct{}{}

stopChan := make(chan struct{})
var stopChanOnce sync.Once

Expand All @@ -65,7 +62,6 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)

db.subscriptionsWG.Add(1)
go func() {
defer clean()
defer db.subscriptionsWG.Done()
defer db.metrics.SubscribePullStop.Inc()
// close the returned store.Descriptor channel at the end to
Expand All @@ -83,7 +79,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
first := true // first iteration flag for SkipStartFromItem
for {
select {
case <-out:
case <-trigger:
// iterate until:
// - last index Item is reached
// - subscription stop is called
Expand Down Expand Up @@ -171,9 +167,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64)
defer db.pullTriggersMu.Unlock()

for i, t := range db.pullTriggers[bin] {
if t == in {
if t == trigger {
db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel can be closed here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that will trigger the above select, it should be okay like this

break
return
}
}
}
Expand Down
19 changes: 13 additions & 6 deletions pkg/puller/metrics.go
Expand Up @@ -10,12 +10,13 @@ import (
)

type metrics struct {
HistWorkerIterCounter prometheus.Counter // counts the number of historical syncing iterations
HistWorkerDoneCounter prometheus.Counter // count number of finished historical syncing jobs
HistWorkerErrCounter prometheus.Counter // count number of errors
LiveWorkerIterCounter prometheus.Counter // counts the number of live syncing iterations
LiveWorkerErrCounter prometheus.Counter // count number of errors
MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost
HistWorkerIterCounter prometheus.Counter // counts the number of historical syncing iterations
HistWorkerDoneCounter prometheus.Counter // count number of finished historical syncing jobs
HistWorkerErrCounter prometheus.Counter // count number of errors
LiveWorkerIterCounter prometheus.Counter // counts the number of live syncing iterations
LiveWorkerErrCounter prometheus.Counter // count number of errors
LiveWorkerErrCancellationCounter prometheus.Counter // count number of errors
MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost
}

func newMetrics() metrics {
Expand Down Expand Up @@ -52,6 +53,12 @@ func newMetrics() metrics {
Name: "live_worker_errors",
Help: "Total live worker errors.",
}),
LiveWorkerErrCancellationCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "live_worker_cancellation_errors",
Help: "Total live worker context cancellation errors.",
}),
MaxUintErrCounter: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand Down