From c66214a78ec69993f51316333a9720424c5446e8 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 14 Oct 2022 16:58:00 +0300 Subject: [PATCH 01/24] fix: bugs --- pkg/flipflop/doc.go | 9 -- pkg/flipflop/falling_edge.go | 68 ------------- pkg/flipflop/falling_edge_test.go | 122 ------------------------ pkg/localstore/subscription_pull.go | 15 +-- pkg/puller/puller.go | 19 ++-- pkg/pullsync/pullstorage/pullstorage.go | 1 + 6 files changed, 18 insertions(+), 216 deletions(-) delete mode 100644 pkg/flipflop/doc.go delete mode 100644 pkg/flipflop/falling_edge.go delete mode 100644 pkg/flipflop/falling_edge_test.go diff --git a/pkg/flipflop/doc.go b/pkg/flipflop/doc.go deleted file mode 100644 index 65a6ae781e6..00000000000 --- a/pkg/flipflop/doc.go +++ /dev/null @@ -1,9 +0,0 @@ -// Copyright 2021 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package flipflop exposes a buffered input functionality -// that mimicks the behavior of falling edge detection -// which is done when doing signal processing on digital -// or analog electric circuitry. -package flipflop diff --git a/pkg/flipflop/falling_edge.go b/pkg/flipflop/falling_edge.go deleted file mode 100644 index 8e0315c0a20..00000000000 --- a/pkg/flipflop/falling_edge.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright 2021 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package flipflop - -import ( - "time" -) - -type detector struct { - t time.Duration - worstCase time.Duration - - buf chan struct{} - out chan struct{} - quit chan struct{} -} - -// NewFallingEdge returns a new falling edge detector. -// bufferTime is the time to buffer, worstCase is buffertime*worstcase time to wait before writing -// to the output anyway. -func NewFallingEdge(bufferTime, worstCase time.Duration) (in chan<- struct{}, out <-chan struct{}, clean func()) { - d := &detector{ - t: bufferTime, - worstCase: worstCase, - buf: make(chan struct{}, 1), - out: make(chan struct{}), - quit: make(chan struct{}), - } - - go d.work() - - return d.buf, d.out, func() { close(d.quit) } -} - -func (d *detector) work() { - var waitWrite <-chan time.Time - var worstCase <-chan time.Time - for { - select { - case <-d.quit: - return - case <-d.buf: - // we have an item in the buffer, dont announce yet - waitWrite = time.After(d.t) - if worstCase == nil { - worstCase = time.After(d.worstCase) - } - case <-waitWrite: - select { - case d.out <- struct{}{}: - case <-d.quit: - return - } - worstCase = nil - waitWrite = nil - case <-worstCase: - select { - case d.out <- struct{}{}: - case <-d.quit: - return - } - worstCase = nil - waitWrite = nil - } - } -} diff --git a/pkg/flipflop/falling_edge_test.go b/pkg/flipflop/falling_edge_test.go deleted file mode 100644 index c7ca04d56ac..00000000000 --- a/pkg/flipflop/falling_edge_test.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2021 The Swarm Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package flipflop_test - -import ( - "testing" - "time" - - "github.com/ethersphere/bee/pkg/flipflop" -) - -func TestFallingEdge(t *testing.T) { - t.Parallel() - t.Skip("github actions") - - ok := make(chan struct{}) - tt := 50 * time.Millisecond - worst := 5 * tt - in, c, cleanup := flipflop.NewFallingEdge(tt, worst) - defer cleanup() - go func() { - select { - case <-c: - close(ok) - return - case <-time.After(100 * time.Millisecond): - t.Error("timed out") - } - }() - - in <- struct{}{} - - select { - case <-ok: - case <-time.After(1 * time.Second): - t.Fatal("timed out") - } -} - -func TestFallingEdgeBuffer(t *testing.T) { - t.Parallel() - t.Skip("needs parameter tweaking on github actions") - - ok := make(chan struct{}) - tt := 150 * time.Millisecond - worst := 9 * tt - in, c, cleanup := flipflop.NewFallingEdge(tt, worst) - defer cleanup() - sleeps := 5 - wait := 50 * time.Millisecond - - start := time.Now() - online := make(chan struct{}) - go func() { - close(online) - select { - case <-c: - if time.Since(start) <= 450*time.Millisecond { - t.Errorf("wrote too early %v", time.Since(start)) - } - close(ok) - return - case <-time.After(1000 * time.Millisecond): - t.Error("timed out") - } - }() - - // wait for goroutine to be scheduled - <-online - - for i := 0; i < sleeps; i++ { - in <- struct{}{} - time.Sleep(wait) - } - select { - case <-ok: - case <-time.After(1 * time.Second): - t.Fatal("timed out") - } -} - -func TestFallingEdgeWorstCase(t *testing.T) { - t.Parallel() - t.Skip("github actions") - - ok := make(chan struct{}) - tt := 100 * time.Millisecond - worst := 5 * tt - in, c, cleanup := flipflop.NewFallingEdge(tt, worst) - defer cleanup() - sleeps := 9 - wait := 80 * time.Millisecond - - start := time.Now() - - go func() { - select { - case <-c: - if time.Since(start) >= 550*time.Millisecond { - t.Errorf("wrote too early %v", time.Since(start)) - } - - close(ok) - return - case <-time.After(1000 * time.Millisecond): - t.Error("timed out") - } - }() - go func() { - for i := 0; i < sleeps; i++ { - in <- struct{}{} - time.Sleep(wait) - } - }() - select { - case <-ok: - case <-time.After(1 * time.Second): - t.Fatal("timed out") - } -} diff --git a/pkg/localstore/subscription_pull.go b/pkg/localstore/subscription_pull.go index ad157fb1eee..4391eb17441 100644 --- a/pkg/localstore/subscription_pull.go +++ b/pkg/localstore/subscription_pull.go @@ -22,7 +22,6 @@ import ( "sync" "time" - "github.com/ethersphere/bee/pkg/flipflop" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -44,18 +43,15 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) chunkDescriptors := make(chan storage.Descriptor) - in, out, clean := flipflop.NewFallingEdge(flipFlopBufferDuration, flipFlopWorstCaseDuration) + trigger := make(chan struct{}) db.pullTriggersMu.Lock() if _, ok := db.pullTriggers[bin]; !ok { db.pullTriggers[bin] = make([]chan<- struct{}, 0) } - db.pullTriggers[bin] = append(db.pullTriggers[bin], in) + db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger) db.pullTriggersMu.Unlock() - // send signal for the initial iteration - in <- struct{}{} - stopChan := make(chan struct{}) var stopChanOnce sync.Once @@ -65,7 +61,6 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) db.subscriptionsWG.Add(1) go func() { - defer clean() defer db.subscriptionsWG.Done() defer db.metrics.SubscribePullStop.Inc() // close the returned store.Descriptor channel at the end to @@ -83,7 +78,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) first := true // first iteration flag for SkipStartFromItem for { select { - case <-out: + case <-trigger: // iterate until: // - last index Item is reached // - subscription stop is called @@ -171,9 +166,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) defer db.pullTriggersMu.Unlock() for i, t := range db.pullTriggers[bin] { - if t == in { + if t == trigger { db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...) - break + return } } } diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 5e33bd14329..0cf531e87ae 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -95,6 +95,14 @@ type peer struct { func (p *Puller) manage(warmupTime time.Duration) { defer p.wg.Done() + + // wait for warmup duration to complete + select { + case <-time.After(warmupTime): + case <-p.quit: + return + } + c, unsubscribe := p.topology.SubscribeTopologyChange() defer unsubscribe() @@ -104,13 +112,6 @@ func (p *Puller) manage(warmupTime time.Duration) { cancel() }() - // wait for warmup duration to complete - select { - case <-time.After(warmupTime): - case <-p.quit: - return - } - p.logger.Info("puller: warmup period complete, worker starting.") for { @@ -122,6 +123,9 @@ func (p *Puller) manage(warmupTime time.Duration) { // pick the one with the most // sync with that one + // BUG: we should only sync with within storage radius peers + // because of the if po >= d check in recalc peer + // if we're already syncing with this peer, make sure // that we're syncing the correct bins according to depth neighborhoodDepth := p.topology.NeighborhoodDepth() @@ -362,6 +366,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin return } top, ruid, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur) + //BUG: if err != nil { loggerV2.Debug("histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "error", err) if ruid == 0 { diff --git a/pkg/pullsync/pullstorage/pullstorage.go b/pkg/pullsync/pullstorage/pullstorage.go index 0618eec5d1d..cda1b30a629 100644 --- a/pkg/pullsync/pullstorage/pullstorage.go +++ b/pkg/pullsync/pullstorage/pullstorage.go @@ -128,6 +128,7 @@ func (s *PullStorer) IntervalChunks(ctx context.Context, bin uint8, from, to uin default: } + // BUG, do not touch topmost if nomore { // end of interval reached. no more chunks so interval is complete // return requested `to`. it could be that len(chs) == 0 if the interval From 84b7a378cc2cd632271db298882d8e941a058272 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 14 Oct 2022 17:14:35 +0300 Subject: [PATCH 02/24] fix: bugs --- pkg/puller/puller.go | 2 +- pkg/pullsync/pullstorage/pullstorage.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 0cf531e87ae..a8ebb3c9bc8 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -366,7 +366,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin return } top, ruid, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur) - //BUG: + //BUG: error returned, quiting process if err != nil { loggerV2.Debug("histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "error", err) if ruid == 0 { diff --git a/pkg/pullsync/pullstorage/pullstorage.go b/pkg/pullsync/pullstorage/pullstorage.go index cda1b30a629..e25f754c5e4 100644 --- a/pkg/pullsync/pullstorage/pullstorage.go +++ b/pkg/pullsync/pullstorage/pullstorage.go @@ -128,7 +128,7 @@ func (s *PullStorer) IntervalChunks(ctx context.Context, bin uint8, from, to uin default: } - // BUG, do not touch topmost + // BUG: do not touch topmost if nomore { // end of interval reached. no more chunks so interval is complete // return requested `to`. it could be that len(chs) == 0 if the interval From 20cbcdf70d438416f6ca36f2469740057a3e5755 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sat, 15 Oct 2022 20:08:50 +0300 Subject: [PATCH 03/24] fix: bugs --- pkg/localstore/mode_put.go | 1 + pkg/puller/puller.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/pkg/localstore/mode_put.go b/pkg/localstore/mode_put.go index 7a754efdd2a..c94e9a09198 100644 --- a/pkg/localstore/mode_put.go +++ b/pkg/localstore/mode_put.go @@ -124,6 +124,7 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) if err != nil { return false, 0, 0, err } + // BUG: what if it does not exist in the pull index if exists { return true, 0, 0, nil } diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index a8ebb3c9bc8..6d307ea31d1 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -244,6 +244,8 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8 } var want, dontWant []uint8 + // BUG: we want nodes >= the neighborhood depth, not particularly storage radius + // we could have peers with PO < sync radius if po >= d { // within depth for i := d; i < p.bins; i++ { From 633867900c94b9a4070643a59f346a11069d998d Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 17 Oct 2022 22:58:47 +0300 Subject: [PATCH 04/24] chore: more bugs --- pkg/puller/puller.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 6d307ea31d1..b465fe9efec 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -167,6 +167,9 @@ func (p *Puller) manage(warmupTime time.Duration) { peersToSync = append(peersToSync, peer{addr: peerAddr, po: po}) } else { // already syncing, recalc + // BUG: the depth may increase next time we enter the iterator which means + // that a peer that is not out depth that we are syncing with, will not get recalculated + // and have it's context cancelled. peersToRecalc = append(peersToRecalc, peer{addr: peerAddr, po: po}) } } @@ -223,10 +226,10 @@ func (p *Puller) disconnectPeer(peer swarm.Address, po uint8) { p.cursorsMtx.Unlock() } -func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8) (dontSync bool) { +func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, syncRadius uint8) (dontSync bool) { loggerV2 := p.logger.V(2).Register() - loggerV2.Debug("puller recalculating peer", "peer_address", peer, "proximity_order", po, "depth", d) + loggerV2.Debug("puller recalculating peer", "peer_address", peer, "proximity_order", po, "depth", syncRadius) p.syncPeersMtx.Lock() syncCtx := p.syncPeers[po][peer.ByteString()] @@ -246,9 +249,9 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8 var want, dontWant []uint8 // BUG: we want nodes >= the neighborhood depth, not particularly storage radius // we could have peers with PO < sync radius - if po >= d { + if po >= syncRadius { // within depth - for i := d; i < p.bins; i++ { + for i := syncRadius; i < p.bins; i++ { if i == 0 { continue } @@ -262,7 +265,7 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8 } // cancel everything outside of depth - for i := uint8(0); i < d; i++ { + for i := uint8(0); i < syncRadius; i++ { dontWant = append(dontWant, i) } } else { @@ -276,7 +279,7 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, d uint8 return false } -func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) { +func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, syncRadius uint8) { loggerV2 := p.logger.V(2).Register() p.syncPeersMtx.Lock() @@ -318,7 +321,7 @@ func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, d uint8) } for bin, cur := range c.cursors { - if bin == 0 || uint8(bin) < d { + if bin == 0 || uint8(bin) < syncRadius { continue } p.syncPeerBin(ctx, syncCtx, peer, uint8(bin), cur) From e15ce063eb89af2c6e44dc6228450d2e49b17320 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 18 Oct 2022 14:57:50 +0300 Subject: [PATCH 05/24] fix: more bugs --- pkg/puller/puller.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index b465fe9efec..a36046e599d 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -155,15 +155,15 @@ func (p *Puller) manage(warmupTime time.Duration) { // way that it returns an error - the value must be checked. _ = p.topology.EachPeerRev(func(peerAddr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { if po >= neighborhoodDepth { - bp := p.syncPeers[po] + binPeer := p.syncPeers[po] // delete from peersDisconnected since we'd like to sync // with this peer delete(peersDisconnected, peerAddr.ByteString()) // within depth, sync everything - if _, ok := bp[peerAddr.ByteString()]; !ok { + if _, ok := binPeer[peerAddr.ByteString()]; !ok { // we're not syncing with this peer yet, start doing so - bp[peerAddr.ByteString()] = newSyncPeer(peerAddr, p.bins) + binPeer[peerAddr.ByteString()] = newSyncPeer(peerAddr, p.bins) peersToSync = append(peersToSync, peer{addr: peerAddr, po: po}) } else { // already syncing, recalc @@ -247,6 +247,7 @@ func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, syncRad } var want, dontWant []uint8 + // neighborhood depth = min(kademlia depth, storage radius) // BUG: we want nodes >= the neighborhood depth, not particularly storage radius // we could have peers with PO < sync radius if po >= syncRadius { @@ -417,6 +418,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin default: } top, ruid, err := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor) + //BUG: error returned, quiting process if err != nil { loggerV2.Debug("liveSyncWorker exit on sync error", "peer_address", peer, "bin", bin, "from", from, "error", err) if ruid == 0 { From bcc76d4455c516893275eb08ee6340ff6339cca4 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 18 Oct 2022 23:33:43 +0300 Subject: [PATCH 06/24] fix: puller rewrite --- pkg/puller/puller.go | 266 ++++++++++++-------------------------- pkg/puller/puller_test.go | 8 +- 2 files changed, 87 insertions(+), 187 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index a36046e599d..3729a875b41 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -27,9 +27,9 @@ import ( // loggerName is the tree path name of the logger for this package. const loggerName = "puller" -const ( - cursorPruneTimeout = 24 * time.Hour -) +var errCursorsLength = errors.New("cursors length mismatch") + +const syncSleepDur = time.Minute * 5 type Options struct { Bins uint8 @@ -47,12 +47,15 @@ type Puller struct { syncPeers []map[string]*syncPeer // index is bin, map key is peer address syncPeersMtx sync.Mutex - cursors map[string]peerCursors - cursorsMtx sync.Mutex + // cursors map[string]peerCursors + // cursorsMtx sync.Mutex quit chan struct{} wg sync.WaitGroup + // syncRadius uint8 + // neighborhoodDepth uint8 + bins uint8 // how many bins do we support } @@ -71,7 +74,7 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState syncer: pullSync, metrics: newMetrics(), logger: logger.WithName(loggerName).Register(), - cursors: make(map[string]peerCursors), + // cursors: make(map[string]peerCursors), syncPeers: make([]map[string]*syncPeer, bins), quit: make(chan struct{}), @@ -88,11 +91,6 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState return p } -type peer struct { - addr swarm.Address - po uint8 -} - func (p *Puller) manage(warmupTime time.Duration) { defer p.wg.Done() @@ -123,84 +121,27 @@ func (p *Puller) manage(warmupTime time.Duration) { // pick the one with the most // sync with that one - // BUG: we should only sync with within storage radius peers - // because of the if po >= d check in recalc peer + p.syncPeersMtx.Lock() - // if we're already syncing with this peer, make sure - // that we're syncing the correct bins according to depth neighborhoodDepth := p.topology.NeighborhoodDepth() syncRadius := p.reserveState.GetReserveState().StorageRadius - // we defer the actual start of syncing to get out of the iterator first - var ( - peersToSync []peer - peersToRecalc []peer - peersDisconnected = make(map[string]peer) - ) - - p.syncPeersMtx.Lock() - - // make a map of all peers we're syncing with, then remove from it - // the entries we get from kademlia in the iterator, this way we - // know which peers are no longer there anymore (disconnected) thus - // should be removed from the syncPeer bin. - for po, bin := range p.syncPeers { - for peerAddr, v := range bin { - peersDisconnected[peerAddr] = peer{addr: v.address, po: uint8(po)} - } - } - - // EachPeerRev in this case will never return an error, since the content of the callback - // never returns an error. In case in the future changes are made to the callback in a - // way that it returns an error - the value must be checked. - _ = p.topology.EachPeerRev(func(peerAddr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { + _ = p.topology.EachPeerRev(func(addr swarm.Address, po uint8) (stop, jumpToNext bool, err error) { if po >= neighborhoodDepth { - binPeer := p.syncPeers[po] - // delete from peersDisconnected since we'd like to sync - // with this peer - delete(peersDisconnected, peerAddr.ByteString()) - - // within depth, sync everything - if _, ok := binPeer[peerAddr.ByteString()]; !ok { - // we're not syncing with this peer yet, start doing so - binPeer[peerAddr.ByteString()] = newSyncPeer(peerAddr, p.bins) - peersToSync = append(peersToSync, peer{addr: peerAddr, po: po}) - } else { - // already syncing, recalc - // BUG: the depth may increase next time we enter the iterator which means - // that a peer that is not out depth that we are syncing with, will not get recalculated - // and have it's context cancelled. - peersToRecalc = append(peersToRecalc, peer{addr: peerAddr, po: po}) + // add peer to sync + if _, ok := p.syncPeers[po][addr.ByteString()]; !ok { + p.syncPeers[po][addr.ByteString()] = newSyncPeer(addr, p.bins) } + } else { + // outside of neighborhood + p.disconnectPeer(addr, po) } - // if peer is outside of depth, do nothing here, this - // will cause the peer to stay in the peersDisconnected - // map, leading to cancelling of its running syncing contexts. - return false, false, nil }, topology.Filter{Reachable: true}) - p.syncPeersMtx.Unlock() - for _, v := range peersToSync { - p.syncPeer(ctx, v.addr, v.po, syncRadius) - } - - for _, v := range peersToRecalc { - dontSync := p.recalcPeer(ctx, v.addr, v.po, syncRadius) - // stopgap solution for peers that dont return the correct - // amount of cursors we expect - if dontSync { - peersDisconnected[v.addr.ByteString()] = v - } - } - - p.syncPeersMtx.Lock() - for _, v := range peersDisconnected { - p.disconnectPeer(v.addr, v.po) - } - p.syncPeersMtx.Unlock() + p.recalcPeers(ctx, syncRadius) case <-p.quit: return @@ -208,137 +149,77 @@ func (p *Puller) manage(warmupTime time.Duration) { } } -func (p *Puller) disconnectPeer(peer swarm.Address, po uint8) { +func (p *Puller) disconnectPeer(peerAddr swarm.Address, po uint8) { loggerV2 := p.logger.V(2).Register() - loggerV2.Debug("puller disconnect cleanup peer", "peer_address", peer, "proximity_order", po) - if syncCtx, ok := p.syncPeers[po][peer.ByteString()]; ok { - // disconnectPeer is called under lock, this is safe - syncCtx.gone() - } - delete(p.syncPeers[po], peer.ByteString()) + loggerV2.Debug("puller disconnect cleanup peer", "peer_address", peerAddr, "proximity_order", po) + if peer, ok := p.syncPeers[po][peerAddr.ByteString()]; ok { + peer.gone() - // delete the peer cursors - p.cursorsMtx.Lock() - if c, ok := p.cursors[peer.ByteString()]; ok && c.created.Add(cursorPruneTimeout).After(time.Now()) { - delete(p.cursors, peer.ByteString()) } - p.cursorsMtx.Unlock() + delete(p.syncPeers[po], peerAddr.ByteString()) } -func (p *Puller) recalcPeer(ctx context.Context, peer swarm.Address, po, syncRadius uint8) (dontSync bool) { +func (p *Puller) recalcPeers(ctx context.Context, syncRadius uint8) (dontSync bool) { loggerV2 := p.logger.V(2).Register() - loggerV2.Debug("puller recalculating peer", "peer_address", peer, "proximity_order", po, "depth", syncRadius) - p.syncPeersMtx.Lock() - syncCtx := p.syncPeers[po][peer.ByteString()] - p.syncPeersMtx.Unlock() - - syncCtx.Lock() - defer syncCtx.Unlock() + defer p.syncPeersMtx.Unlock() - p.cursorsMtx.Lock() - c := p.cursors[peer.ByteString()].cursors - p.cursorsMtx.Unlock() + for bin, peers := range p.syncPeers { - if len(c) != int(p.bins) { - return true - } + for _, peer := range peers { - var want, dontWant []uint8 - // neighborhood depth = min(kademlia depth, storage radius) - // BUG: we want nodes >= the neighborhood depth, not particularly storage radius - // we could have peers with PO < sync radius - if po >= syncRadius { - // within depth - for i := syncRadius; i < p.bins; i++ { - if i == 0 { + if bin < int(syncRadius) { + peer.cancelBins(uint8(bin)) continue } - want = append(want, i) - } - for _, bin := range want { - if !syncCtx.isBinSyncing(bin) { - p.syncPeerBin(ctx, syncCtx, peer, bin, c[bin]) + err := p.syncPeer(ctx, peer, syncRadius) + if err != nil { + loggerV2.Error(err, "recalc peers sync failed", "bin", bin, "peer", peer.address) } } - - // cancel everything outside of depth - for i := uint8(0); i < syncRadius; i++ { - dontWant = append(dontWant, i) - } - } else { - // peer is outside depth. cancel everything - for i := uint8(0); i < p.bins; i++ { - dontWant = append(dontWant, i) - } } - syncCtx.cancelBins(dontWant...) return false } -func (p *Puller) syncPeer(ctx context.Context, peer swarm.Address, po, syncRadius uint8) { - loggerV2 := p.logger.V(2).Register() - - p.syncPeersMtx.Lock() - syncCtx := p.syncPeers[po][peer.ByteString()] - p.syncPeersMtx.Unlock() - - syncCtx.Lock() - defer syncCtx.Unlock() +func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, syncRadius uint8) error { + peer.Lock() + defer peer.Unlock() - p.cursorsMtx.Lock() - c, ok := p.cursors[peer.ByteString()] - p.cursorsMtx.Unlock() - - if !ok { - cursors, err := p.syncer.GetCursors(ctx, peer) + if peer.cursors == nil { + cursors, err := p.syncer.GetCursors(ctx, peer.address) if err != nil { - loggerV2.Debug("could not get cursors from peer", "peer_address", peer, "error", err) - p.syncPeersMtx.Lock() - delete(p.syncPeers[po], peer.ByteString()) - p.syncPeersMtx.Unlock() - - return - // remove from syncing peers list, trigger channel to find some other peer - // maybe blacklist for some time + return fmt.Errorf("could not get cursors from peer %s: %w", peer.address, err) } - p.cursorsMtx.Lock() - p.cursors[peer.ByteString()] = peerCursors{created: time.Now(), cursors: cursors} - c = p.cursors[peer.ByteString()] - p.cursorsMtx.Unlock() + peer.cursors = cursors } - // if length of returned cursors does not add up to - // what we expect it to be - dont do anything - if len(c.cursors) != int(p.bins) { - p.syncPeersMtx.Lock() - delete(p.syncPeers[po], peer.ByteString()) - p.syncPeersMtx.Unlock() - return + if len(peer.cursors) != int(p.bins) { + return errCursorsLength } - for bin, cur := range c.cursors { - if bin == 0 || uint8(bin) < syncRadius { - continue + for bin, cur := range peer.cursors { + if bin >= int(syncRadius) && !peer.isBinSyncing(uint8(bin)) { + p.syncPeerBin(ctx, peer, uint8(bin), cur) } - p.syncPeerBin(ctx, syncCtx, peer, uint8(bin), cur) } + + return nil } -func (p *Puller) syncPeerBin(ctx context.Context, syncCtx *syncPeer, peer swarm.Address, bin uint8, cur uint64) { +func (p *Puller) syncPeerBin(ctx context.Context, peer *syncPeer, bin uint8, cur uint64) { binCtx, cancel := context.WithCancel(ctx) - syncCtx.setBinCancel(cancel, bin) + peer.setBinCancel(cancel, bin) if cur > 0 { p.wg.Add(1) - go p.histSyncWorker(binCtx, peer, bin, cur) + go p.histSyncWorker(binCtx, peer.address, bin, cur) } // start live p.wg.Add(1) - go p.liveSyncWorker(binCtx, peer, bin, cur) + go p.liveSyncWorker(binCtx, peer.address, bin, cur) } func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) { @@ -372,7 +253,6 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin return } top, ruid, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur) - //BUG: error returned, quiting process if err != nil { loggerV2.Debug("histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "error", err) if ruid == 0 { @@ -406,8 +286,25 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin defer p.wg.Done() loggerV2.Debug("liveSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) from := cur + 1 + + sleep := false + for { p.metrics.LiveWorkerIterCounter.Inc() + + if sleep { + select { + case <-p.quit: + loggerV2.Debug("liveSyncWorker quit on shutdown", "peer_address", peer, "bin", bin, "cursor", cur) + return + case <-ctx.Done(): + loggerV2.Debug("liveSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) + return + case <-time.After(syncSleepDur): + } + sleep = false + } + select { case <-p.quit: loggerV2.Debug("liveSyncWorker quit on shutdown", "peer_address", peer, "bin", bin, "cursor", cur) @@ -417,14 +314,20 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin return default: } + top, ruid, err := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor) - //BUG: error returned, quiting process if err != nil { - loggerV2.Debug("liveSyncWorker exit on sync error", "peer_address", peer, "bin", bin, "from", from, "error", err) if ruid == 0 { p.metrics.LiveWorkerErrCounter.Inc() } + if errors.Is(err, context.Canceled) { + loggerV2.Debug("liveSyncWorker context canceled", "peer_address", peer, "bin", bin, "from", from, "error", err) + sleep = true + continue + } + + loggerV2.Debug("liveSyncWorker exit on sync error", "peer_address", peer, "bin", bin, "from", from, "error", err) // since we use bin context cancellation to cancel interval // sync operations, the context can be expired here, causing us // to try to send a message with an expired context, which is @@ -446,7 +349,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err) return } - loggerV2.Debug("liveSyncWorker pulled bin failed", "bin", bin, "from", from, "topmost", top, "peer_address", peer) + loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer) from = top + 1 } @@ -521,6 +424,8 @@ type syncPeer struct { address swarm.Address binCancelFuncs map[uint8]func() // slice of context cancel funcs for historical sync. index is bin + cursors []uint64 + sync.Mutex } @@ -545,12 +450,10 @@ func (p *syncPeer) setBinCancel(cf func(), bin uint8) { p.binCancelFuncs[bin] = cf } -func (p *syncPeer) cancelBins(bins ...uint8) { - for _, bin := range bins { - if c, ok := p.binCancelFuncs[bin]; ok { - c() - delete(p.binCancelFuncs, bin) - } +func (p *syncPeer) cancelBins(bin uint8) { + if c, ok := p.binCancelFuncs[bin]; ok { + c() + delete(p.binCancelFuncs, bin) } } @@ -574,8 +477,3 @@ func isSyncing(p *Puller, addr swarm.Address) bool { } return false } - -type peerCursors struct { - created time.Time - cursors []uint64 -} diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 93e804e9d14..b992d65b8f4 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -111,13 +111,15 @@ func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) { expLiveCalls []c // expected live sync calls }{ { - name: "cursor 0, 1 chunk on live", cursors: []uint64{0, 0}, + name: "cursor 0, 1 chunk on live", + cursors: []uint64{0, 0}, intervals: "[[1 1]]", liveReplies: []uint64{1}, expLiveCalls: []c{call(1, 1, max), call(1, 2, max)}, }, { - name: "cursor 0 - calls 1-1, 2-5, 6-10", cursors: []uint64{0, 0}, + name: "cursor 0 - calls 1-1, 2-5, 6-10", + cursors: []uint64{0, 0}, intervals: "[[1 10]]", liveReplies: []uint64{1, 5, 10}, expLiveCalls: []c{call(1, 1, max), call(1, 2, max), call(1, 6, max), call(1, 11, max)}, @@ -125,7 +127,7 @@ func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) { } { tc := tc t.Run(tc.name, func(t *testing.T) { - t.Parallel() + // t.Parallel() puller, st, kad, pullsync := newPuller(opts{ kad: []mockk.Option{ From bd72ec0e00b31cb1d2360760592b143c2e5a6483 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 18 Oct 2022 23:44:55 +0300 Subject: [PATCH 07/24] fix: trigger fix --- pkg/localstore/mode_put.go | 1 - pkg/localstore/subscription_pull.go | 3 ++- pkg/puller/puller_test.go | 2 +- pkg/pullsync/pullstorage/pullstorage.go | 1 - 4 files changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/localstore/mode_put.go b/pkg/localstore/mode_put.go index c94e9a09198..7a754efdd2a 100644 --- a/pkg/localstore/mode_put.go +++ b/pkg/localstore/mode_put.go @@ -124,7 +124,6 @@ func (db *DB) put(ctx context.Context, mode storage.ModePut, chs ...swarm.Chunk) if err != nil { return false, 0, 0, err } - // BUG: what if it does not exist in the pull index if exists { return true, 0, 0, nil } diff --git a/pkg/localstore/subscription_pull.go b/pkg/localstore/subscription_pull.go index 4391eb17441..9138f1884dd 100644 --- a/pkg/localstore/subscription_pull.go +++ b/pkg/localstore/subscription_pull.go @@ -43,7 +43,8 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) chunkDescriptors := make(chan storage.Descriptor) - trigger := make(chan struct{}) + trigger := make(chan struct{}, 1) + trigger <- struct{}{} db.pullTriggersMu.Lock() if _, ok := db.pullTriggers[bin]; !ok { diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index b992d65b8f4..8a73de06fd8 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -127,7 +127,7 @@ func TestSyncFlow_PeerWithinDepth_Live(t *testing.T) { } { tc := tc t.Run(tc.name, func(t *testing.T) { - // t.Parallel() + t.Parallel() puller, st, kad, pullsync := newPuller(opts{ kad: []mockk.Option{ diff --git a/pkg/pullsync/pullstorage/pullstorage.go b/pkg/pullsync/pullstorage/pullstorage.go index e25f754c5e4..0618eec5d1d 100644 --- a/pkg/pullsync/pullstorage/pullstorage.go +++ b/pkg/pullsync/pullstorage/pullstorage.go @@ -128,7 +128,6 @@ func (s *PullStorer) IntervalChunks(ctx context.Context, bin uint8, from, to uin default: } - // BUG: do not touch topmost if nomore { // end of interval reached. no more chunks so interval is complete // return requested `to`. it could be that len(chs) == 0 if the interval From 488db7b2c4e779c4e269a0ecead8c3c0bcd15c60 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 18 Oct 2022 23:51:07 +0300 Subject: [PATCH 08/24] chore: comments --- pkg/puller/puller.go | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 3729a875b41..e0f5a06f05b 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -47,15 +47,9 @@ type Puller struct { syncPeers []map[string]*syncPeer // index is bin, map key is peer address syncPeersMtx sync.Mutex - // cursors map[string]peerCursors - // cursorsMtx sync.Mutex - quit chan struct{} wg sync.WaitGroup - // syncRadius uint8 - // neighborhoodDepth uint8 - bins uint8 // how many bins do we support } @@ -74,11 +68,9 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState syncer: pullSync, metrics: newMetrics(), logger: logger.WithName(loggerName).Register(), - // cursors: make(map[string]peerCursors), - - syncPeers: make([]map[string]*syncPeer, bins), - quit: make(chan struct{}), - wg: sync.WaitGroup{}, + syncPeers: make([]map[string]*syncPeer, bins), + quit: make(chan struct{}), + wg: sync.WaitGroup{}, bins: bins, } @@ -94,7 +86,6 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState func (p *Puller) manage(warmupTime time.Duration) { defer p.wg.Done() - // wait for warmup duration to complete select { case <-time.After(warmupTime): case <-p.quit: @@ -115,11 +106,6 @@ func (p *Puller) manage(warmupTime time.Duration) { for { select { case <-c: - // get all peers from kademlia - // iterate on entire bin at once (get all peers first) - // check how many intervals we synced with all of them - // pick the one with the most - // sync with that one p.syncPeersMtx.Lock() @@ -139,16 +125,19 @@ func (p *Puller) manage(warmupTime time.Duration) { return false, false, nil }, topology.Filter{Reachable: true}) - p.syncPeersMtx.Unlock() p.recalcPeers(ctx, syncRadius) + p.syncPeersMtx.Unlock() + case <-p.quit: return } } } +// disconnectPeer cancels all existing syncing and removes the peer entry from the syncing map. +// Must be called under lock. func (p *Puller) disconnectPeer(peerAddr swarm.Address, po uint8) { loggerV2 := p.logger.V(2).Register() @@ -160,12 +149,11 @@ func (p *Puller) disconnectPeer(peerAddr swarm.Address, po uint8) { delete(p.syncPeers[po], peerAddr.ByteString()) } +// recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius. +// Must be called under lock. func (p *Puller) recalcPeers(ctx context.Context, syncRadius uint8) (dontSync bool) { loggerV2 := p.logger.V(2).Register() - p.syncPeersMtx.Lock() - defer p.syncPeersMtx.Unlock() - for bin, peers := range p.syncPeers { for _, peer := range peers { @@ -210,6 +198,8 @@ func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, syncRadius uint8) return nil } +// syncPeerBin will start historical and live syncing for the peer for a particular bin. +// Must be called under syncPeer lock. func (p *Puller) syncPeerBin(ctx context.Context, peer *syncPeer, bin uint8, cur uint64) { binCtx, cancel := context.WithCancel(ctx) peer.setBinCancel(cancel, bin) From 8a75cc444698d01ab990d845424f22e84f98fe9a Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 18 Oct 2022 23:56:20 +0300 Subject: [PATCH 09/24] chore: lint --- pkg/localstore/localstore.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pkg/localstore/localstore.go b/pkg/localstore/localstore.go index 30e41603610..bee9a77b81b 100644 --- a/pkg/localstore/localstore.go +++ b/pkg/localstore/localstore.go @@ -60,11 +60,6 @@ var ( // Limit the number of goroutines created by Getters // that call updateGC function. Value 0 sets no limit. maxParallelUpdateGC = 1000 - - // values needed to adjust subscription trigger - // buffer time. - flipFlopBufferDuration = 150 * time.Millisecond - flipFlopWorstCaseDuration = 10 * time.Second ) const ( From 987a878c183c4b57ab4226f72beeb84c41b87fb7 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 19 Oct 2022 12:49:27 +0300 Subject: [PATCH 10/24] chore: metrics --- pkg/puller/metrics.go | 19 +++++++++++++------ pkg/puller/puller.go | 13 ++++++------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/puller/metrics.go b/pkg/puller/metrics.go index 6d8bcac507b..31c098cef2e 100644 --- a/pkg/puller/metrics.go +++ b/pkg/puller/metrics.go @@ -10,12 +10,13 @@ import ( ) type metrics struct { - HistWorkerIterCounter prometheus.Counter // counts the number of historical syncing iterations - HistWorkerDoneCounter prometheus.Counter // count number of finished historical syncing jobs - HistWorkerErrCounter prometheus.Counter // count number of errors - LiveWorkerIterCounter prometheus.Counter // counts the number of live syncing iterations - LiveWorkerErrCounter prometheus.Counter // count number of errors - MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost + HistWorkerIterCounter prometheus.Counter // counts the number of historical syncing iterations + HistWorkerDoneCounter prometheus.Counter // count number of finished historical syncing jobs + HistWorkerErrCounter prometheus.Counter // count number of errors + LiveWorkerIterCounter prometheus.Counter // counts the number of live syncing iterations + LiveWorkerErrCounter prometheus.Counter // count number of errors + LiveWorkerErrCancellationCounter prometheus.Counter // count number of errors + MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost } func newMetrics() metrics { @@ -52,6 +53,12 @@ func newMetrics() metrics { Name: "live_worker_errors", Help: "Total live worker errors.", }), + LiveWorkerErrCancellationCounter: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "live_worker_cancellation_errors", + Help: "Total live worker context cancellation errors.", + }), MaxUintErrCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index e0f5a06f05b..ab09ae5fcbd 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -139,7 +139,7 @@ func (p *Puller) manage(warmupTime time.Duration) { // disconnectPeer cancels all existing syncing and removes the peer entry from the syncing map. // Must be called under lock. func (p *Puller) disconnectPeer(peerAddr swarm.Address, po uint8) { - loggerV2 := p.logger.V(2).Register() + loggerV2 := p.logger loggerV2.Debug("puller disconnect cleanup peer", "peer_address", peerAddr, "proximity_order", po) if peer, ok := p.syncPeers[po][peerAddr.ByteString()]; ok { @@ -152,7 +152,7 @@ func (p *Puller) disconnectPeer(peerAddr swarm.Address, po uint8) { // recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius. // Must be called under lock. func (p *Puller) recalcPeers(ctx context.Context, syncRadius uint8) (dontSync bool) { - loggerV2 := p.logger.V(2).Register() + loggerV2 := p.logger for bin, peers := range p.syncPeers { @@ -213,7 +213,7 @@ func (p *Puller) syncPeerBin(ctx context.Context, peer *syncPeer, bin uint8, cur } func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) { - loggerV2 := p.logger.V(2).Register() + loggerV2 := p.logger defer func() { p.wg.Done() @@ -271,7 +271,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin } func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) { - loggerV2 := p.logger.V(2).Register() + loggerV2 := p.logger defer p.wg.Done() loggerV2.Debug("liveSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) @@ -307,13 +307,12 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin top, ruid, err := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor) if err != nil { - if ruid == 0 { - p.metrics.LiveWorkerErrCounter.Inc() - } + p.metrics.LiveWorkerErrCounter.Inc() if errors.Is(err, context.Canceled) { loggerV2.Debug("liveSyncWorker context canceled", "peer_address", peer, "bin", bin, "from", from, "error", err) sleep = true + p.metrics.LiveWorkerErrCancellationCounter.Inc() continue } From a03b64d20184f345fad2768dd56aa4503c0bdf62 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 19 Oct 2022 13:35:42 +0300 Subject: [PATCH 11/24] fix: prune gone peers --- pkg/puller/puller.go | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index ab09ae5fcbd..1f942333a6c 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -109,6 +109,14 @@ func (p *Puller) manage(warmupTime time.Duration) { p.syncPeersMtx.Lock() + // peersDisconnected is used to mark and prune peers that are no longer connected. + peersDisconnected := make(map[string]*syncPeer) + for _, bin := range p.syncPeers { + for addr, peer := range bin { + peersDisconnected[addr] = peer + } + } + neighborhoodDepth := p.topology.NeighborhoodDepth() syncRadius := p.reserveState.GetReserveState().StorageRadius @@ -116,19 +124,23 @@ func (p *Puller) manage(warmupTime time.Duration) { if po >= neighborhoodDepth { // add peer to sync if _, ok := p.syncPeers[po][addr.ByteString()]; !ok { - p.syncPeers[po][addr.ByteString()] = newSyncPeer(addr, p.bins) + p.syncPeers[po][addr.ByteString()] = newSyncPeer(addr, po, p.bins) } + // remove from disconnected list as the peer is still connected + delete(peersDisconnected, addr.ByteString()) } else { - // outside of neighborhood - p.disconnectPeer(addr, po) + // outside of neighborhood, prune peer + p.disconnectPeer(addr.ByteString(), po) } return false, false, nil }, topology.Filter{Reachable: true}) - p.recalcPeers(ctx, syncRadius) + for addr, peer := range peersDisconnected { + p.disconnectPeer(addr, peer.po) + } - p.syncPeersMtx.Unlock() + p.recalcPeers(ctx, syncRadius) case <-p.quit: return @@ -138,15 +150,15 @@ func (p *Puller) manage(warmupTime time.Duration) { // disconnectPeer cancels all existing syncing and removes the peer entry from the syncing map. // Must be called under lock. -func (p *Puller) disconnectPeer(peerAddr swarm.Address, po uint8) { +func (p *Puller) disconnectPeer(peerAddr string, po uint8) { loggerV2 := p.logger loggerV2.Debug("puller disconnect cleanup peer", "peer_address", peerAddr, "proximity_order", po) - if peer, ok := p.syncPeers[po][peerAddr.ByteString()]; ok { + if peer, ok := p.syncPeers[po][peerAddr]; ok { peer.gone() } - delete(p.syncPeers[po], peerAddr.ByteString()) + delete(p.syncPeers[po], peerAddr) } // recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius. @@ -310,7 +322,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin p.metrics.LiveWorkerErrCounter.Inc() if errors.Is(err, context.Canceled) { - loggerV2.Debug("liveSyncWorker context canceled", "peer_address", peer, "bin", bin, "from", from, "error", err) + loggerV2.Debug("liveSyncWorker sync interval context canceled", "peer_address", peer, "bin", bin, "from", from, "error", err) sleep = true p.metrics.LiveWorkerErrCancellationCounter.Inc() continue @@ -412,13 +424,14 @@ func peerIntervalKey(peer swarm.Address, bin uint8) string { type syncPeer struct { address swarm.Address binCancelFuncs map[uint8]func() // slice of context cancel funcs for historical sync. index is bin + po uint8 cursors []uint64 sync.Mutex } -func newSyncPeer(addr swarm.Address, bins uint8) *syncPeer { +func newSyncPeer(addr swarm.Address, po, bins uint8) *syncPeer { return &syncPeer{ address: addr, binCancelFuncs: make(map[uint8]func(), bins), From 32ac92a91c330723e3c9ec3ddc44679c83da514e Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 19 Oct 2022 16:32:43 +0300 Subject: [PATCH 12/24] fix: missing unlock --- pkg/puller/puller.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 1f942333a6c..9cdf681912a 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -142,6 +142,8 @@ func (p *Puller) manage(warmupTime time.Duration) { p.recalcPeers(ctx, syncRadius) + p.syncPeersMtx.Unlock() + case <-p.quit: return } From a95ab4f05a3eba67ed6de90fd4781766535c59c1 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 19 Oct 2022 16:55:33 +0300 Subject: [PATCH 13/24] fix: removed listen on quite --- pkg/puller/puller.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 9cdf681912a..606a90e1d4c 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -105,6 +105,8 @@ func (p *Puller) manage(warmupTime time.Duration) { for { select { + case <-p.quit: + return case <-c: p.syncPeersMtx.Lock() @@ -143,9 +145,6 @@ func (p *Puller) manage(warmupTime time.Duration) { p.recalcPeers(ctx, syncRadius) p.syncPeersMtx.Unlock() - - case <-p.quit: - return } } } @@ -237,9 +236,6 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin for { p.metrics.HistWorkerIterCounter.Inc() select { - case <-p.quit: - loggerV2.Debug("histSyncWorker quitting on shutdown", "peer_address", peer, "bin", bin, "cursor", cur) - return case <-ctx.Done(): loggerV2.Debug("histSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) return @@ -298,9 +294,6 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin if sleep { select { - case <-p.quit: - loggerV2.Debug("liveSyncWorker quit on shutdown", "peer_address", peer, "bin", bin, "cursor", cur) - return case <-ctx.Done(): loggerV2.Debug("liveSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) return @@ -310,9 +303,6 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin } select { - case <-p.quit: - loggerV2.Debug("liveSyncWorker quit on shutdown", "peer_address", peer, "bin", bin, "cursor", cur) - return case <-ctx.Done(): loggerV2.Debug("liveSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) return From 740c4362aa31b60c6c8b8b14c4e13144f89fb2b6 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Wed, 19 Oct 2022 17:14:15 +0300 Subject: [PATCH 14/24] fix: disconnect with swarm address --- pkg/puller/puller.go | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 606a90e1d4c..e2133db599a 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -70,7 +70,6 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState logger: logger.WithName(loggerName).Register(), syncPeers: make([]map[string]*syncPeer, bins), quit: make(chan struct{}), - wg: sync.WaitGroup{}, bins: bins, } @@ -132,14 +131,14 @@ func (p *Puller) manage(warmupTime time.Duration) { delete(peersDisconnected, addr.ByteString()) } else { // outside of neighborhood, prune peer - p.disconnectPeer(addr.ByteString(), po) + p.disconnectPeer(addr, po) } return false, false, nil }, topology.Filter{Reachable: true}) - for addr, peer := range peersDisconnected { - p.disconnectPeer(addr, peer.po) + for _, peer := range peersDisconnected { + p.disconnectPeer(peer.address, peer.po) } p.recalcPeers(ctx, syncRadius) @@ -151,15 +150,15 @@ func (p *Puller) manage(warmupTime time.Duration) { // disconnectPeer cancels all existing syncing and removes the peer entry from the syncing map. // Must be called under lock. -func (p *Puller) disconnectPeer(peerAddr string, po uint8) { +func (p *Puller) disconnectPeer(addr swarm.Address, po uint8) { loggerV2 := p.logger - loggerV2.Debug("puller disconnect cleanup peer", "peer_address", peerAddr, "proximity_order", po) - if peer, ok := p.syncPeers[po][peerAddr]; ok { + loggerV2.Debug("puller disconnect cleanup peer", "peer_address", addr, "proximity_order", po) + if peer, ok := p.syncPeers[po][addr.ByteString()]; ok { peer.gone() } - delete(p.syncPeers[po], peerAddr) + delete(p.syncPeers[po], addr.ByteString()) } // recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius. @@ -228,10 +227,9 @@ func (p *Puller) syncPeerBin(ctx context.Context, peer *syncPeer, bin uint8, cur func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uint8, cur uint64) { loggerV2 := p.logger - defer func() { - p.wg.Done() - p.metrics.HistWorkerDoneCounter.Inc() - }() + defer p.wg.Done() + defer p.metrics.HistWorkerDoneCounter.Inc() + loggerV2.Debug("histSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) for { p.metrics.HistWorkerIterCounter.Inc() From 081e4285be14eee24e324b01f4c9b5f472978e5e Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 20 Oct 2022 13:29:06 +0300 Subject: [PATCH 15/24] fix: pullsync and puller sync process to continue (#3444) --- .golangci.yml | 1 - pkg/puller/metrics.go | 19 +++------- pkg/puller/puller.go | 82 ++++++++++++++++++---------------------- pkg/pullsync/pullsync.go | 35 ++++++++++------- 4 files changed, 63 insertions(+), 74 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 58246a0ce4a..52dfc934d01 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -23,7 +23,6 @@ linters: - importas - ineffassign - misspell - - nakedret - nilerr - noctx - paralleltest diff --git a/pkg/puller/metrics.go b/pkg/puller/metrics.go index 31c098cef2e..6d8bcac507b 100644 --- a/pkg/puller/metrics.go +++ b/pkg/puller/metrics.go @@ -10,13 +10,12 @@ import ( ) type metrics struct { - HistWorkerIterCounter prometheus.Counter // counts the number of historical syncing iterations - HistWorkerDoneCounter prometheus.Counter // count number of finished historical syncing jobs - HistWorkerErrCounter prometheus.Counter // count number of errors - LiveWorkerIterCounter prometheus.Counter // counts the number of live syncing iterations - LiveWorkerErrCounter prometheus.Counter // count number of errors - LiveWorkerErrCancellationCounter prometheus.Counter // count number of errors - MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost + HistWorkerIterCounter prometheus.Counter // counts the number of historical syncing iterations + HistWorkerDoneCounter prometheus.Counter // count number of finished historical syncing jobs + HistWorkerErrCounter prometheus.Counter // count number of errors + LiveWorkerIterCounter prometheus.Counter // counts the number of live syncing iterations + LiveWorkerErrCounter prometheus.Counter // count number of errors + MaxUintErrCounter prometheus.Counter // how many times we got maxuint as topmost } func newMetrics() metrics { @@ -53,12 +52,6 @@ func newMetrics() metrics { Name: "live_worker_errors", Help: "Total live worker errors.", }), - LiveWorkerErrCancellationCounter: prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: m.Namespace, - Subsystem: subsystem, - Name: "live_worker_cancellation_errors", - Help: "Total live worker context cancellation errors.", - }), MaxUintErrCounter: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index e2133db599a..189db06fe67 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -230,9 +230,23 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin defer p.wg.Done() defer p.metrics.HistWorkerDoneCounter.Inc() - loggerV2.Debug("histSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) + sleep := false + + p.logger.Debug("histSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) + for { p.metrics.HistWorkerIterCounter.Inc() + + if sleep { + select { + case <-ctx.Done(): + loggerV2.Debug("histSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) + return + case <-time.After(syncSleepDur): + } + sleep = false + } + select { case <-ctx.Done(): loggerV2.Debug("histSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) @@ -243,37 +257,27 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin s, _, _, err := p.nextPeerInterval(peer, bin) if err != nil { p.metrics.HistWorkerErrCounter.Inc() - p.logger.Debug("histSyncWorker nextPeerInterval failed", "error", err) + p.logger.Error(err, "histSyncWorker nextPeerInterval failed") return } + if s > cur { - loggerV2.Debug("histSyncWorker syncing finished", "bin", bin, "cursor", cur) + p.logger.Debug("histSyncWorker syncing finished", "bin", bin, "cursor", cur) return } - top, ruid, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur) - if err != nil { - loggerV2.Debug("histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "error", err) - if ruid == 0 { - p.metrics.HistWorkerErrCounter.Inc() - } + top, _, syncErr := p.syncer.SyncInterval(ctx, peer, bin, s, cur) - // since we use bin context cancellation to cancel interval - // sync operations, the context can be expired here, causing us - // to try to send a message with an expired context, which is - // bound to fail. - ctxC, cancelC := context.WithTimeout(context.Background(), 10*time.Second) - defer cancelC() - if err := p.syncer.CancelRuid(ctxC, peer, ruid); err != nil { - loggerV2.Debug("histSyncWorker cancel ruid failed", "error", err) - } - return - } err = p.addPeerInterval(peer, bin, s, top) if err != nil { p.metrics.HistWorkerErrCounter.Inc() - p.logger.Error(err, "could not persist interval for peer, quitting...", "peer_address", peer) + p.logger.Error(err, "histSyncWorker could not persist interval for peer, quitting...", "peer_address", peer) return } + if syncErr != nil { + p.logger.Error(err, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top) + continue + } + loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer) } } @@ -282,7 +286,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin loggerV2 := p.logger defer p.wg.Done() - loggerV2.Debug("liveSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) + p.logger.Debug("liveSyncWorker starting", "peer_address", peer, "bin", bin, "cursor", cur) from := cur + 1 sleep := false @@ -307,39 +311,25 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin default: } - top, ruid, err := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor) - if err != nil { - p.metrics.LiveWorkerErrCounter.Inc() - - if errors.Is(err, context.Canceled) { - loggerV2.Debug("liveSyncWorker sync interval context canceled", "peer_address", peer, "bin", bin, "from", from, "error", err) - sleep = true - p.metrics.LiveWorkerErrCancellationCounter.Inc() - continue - } - - loggerV2.Debug("liveSyncWorker exit on sync error", "peer_address", peer, "bin", bin, "from", from, "error", err) - // since we use bin context cancellation to cancel interval - // sync operations, the context can be expired here, causing us - // to try to send a message with an expired context, which is - // bound to fail. - ctxC, cancelC := context.WithTimeout(context.Background(), 10*time.Second) - defer cancelC() - if err := p.syncer.CancelRuid(ctxC, peer, ruid); err != nil { - loggerV2.Debug("histSyncWorker cancel ruid failed", "error", err) - } - return - } + top, _, syncErr := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor) if top == math.MaxUint64 { p.metrics.MaxUintErrCounter.Inc() return } - err = p.addPeerInterval(peer, bin, from, top) + err := p.addPeerInterval(peer, bin, from, top) if err != nil { p.metrics.LiveWorkerErrCounter.Inc() p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err) return } + + if syncErr != nil { + p.metrics.LiveWorkerErrCounter.Inc() + p.logger.Error(err, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top) + sleep = true + continue + } + loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer) from = top + 1 diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 34a39465afa..8a84d19dd86 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -184,15 +184,20 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 return offer.Topmost, ru.Ruid, nil } + topmost = offer.Topmost + ruid = ru.Ruid + var ( bvLen = len(offer.Hashes) / swarm.HashSize wantChunks = make(map[string]struct{}) ctr = 0 + have bool ) bv, err := bitvector.New(bvLen) if err != nil { - return 0, ru.Ruid, fmt.Errorf("new bitvector: %w", err) + err = fmt.Errorf("new bitvector: %w", err) + return } for i := 0; i < len(offer.Hashes); i += swarm.HashSize { @@ -200,13 +205,15 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 if a.Equal(swarm.ZeroAddress) { // i'd like to have this around to see we don't see any of these in the logs s.logger.Error(nil, "syncer got a zero address hash on offer") - return 0, ru.Ruid, fmt.Errorf("zero address on offer") + err = fmt.Errorf("zero address on offer") + return } s.metrics.Offered.Inc() s.metrics.DbOps.Inc() - have, err := s.storage.Has(ctx, a) + have, err = s.storage.Has(ctx, a) if err != nil { - return 0, ru.Ruid, fmt.Errorf("storage has: %w", err) + err = fmt.Errorf("storage has: %w", err) + return } if !have { wantChunks[a.String()] = struct{}{} @@ -218,7 +225,8 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 wantMsg := &pb.Want{BitVector: bv.Bytes()} if err = w.WriteMsgWithContext(ctx, wantMsg); err != nil { - return 0, ru.Ruid, fmt.Errorf("write want: %w", err) + err = fmt.Errorf("write want: %w", err) + return } // if ctr is zero, it means we don't want any chunk in the batch @@ -241,7 +249,8 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 if _, ok := wantChunks[addr.String()]; !ok { // this is fatal for the entire batch, return the // error and don't write the partial batch. - return 0, ru.Ruid, ErrUnsolicitedChunk + err = ErrUnsolicitedChunk + return } delete(wantChunks, addr.String()) @@ -258,7 +267,8 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 } else if !soc.Valid(chunk) { // this is fatal for the entire batch, return the // error and don't write the partial batch. - return 0, ru.Ruid, swarm.ErrInvalidChunk + err = swarm.ErrInvalidChunk + return } chunksToPut = append(chunksToPut, chunk) } @@ -269,20 +279,17 @@ func (s *Syncer) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8 s.metrics.DbOps.Inc() ctx, cancel := context.WithTimeout(ctx, storagePutTimeout) defer cancel() + if ierr := s.storage.Put(ctx, storage.ModePutSync, chunksToPut...); ierr != nil { if err != nil { ierr = fmt.Errorf(", sync err: %w", err) } - return 0, ru.Ruid, fmt.Errorf("delivery put: %w", ierr) + err = fmt.Errorf("delivery put: %w", ierr) + return } } - // there might have been an error in the for loop above, - // return it if it indeed happened - if err != nil { - return 0, ru.Ruid, err - } - return offer.Topmost, ru.Ruid, nil + return } // handler handles an incoming request to sync an interval From ff7688f0505972a051c94c4a66deb2ef5d5bf38a Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 20 Oct 2022 13:32:54 +0300 Subject: [PATCH 16/24] revert: falling edge --- pkg/flipflop/doc.go | 9 ++ pkg/flipflop/falling_edge.go | 68 ++++++++++++++++ pkg/flipflop/falling_edge_test.go | 122 ++++++++++++++++++++++++++++ pkg/localstore/localstore.go | 5 ++ pkg/localstore/subscription_pull.go | 16 ++-- 5 files changed, 214 insertions(+), 6 deletions(-) create mode 100644 pkg/flipflop/doc.go create mode 100644 pkg/flipflop/falling_edge.go create mode 100644 pkg/flipflop/falling_edge_test.go diff --git a/pkg/flipflop/doc.go b/pkg/flipflop/doc.go new file mode 100644 index 00000000000..65a6ae781e6 --- /dev/null +++ b/pkg/flipflop/doc.go @@ -0,0 +1,9 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package flipflop exposes a buffered input functionality +// that mimicks the behavior of falling edge detection +// which is done when doing signal processing on digital +// or analog electric circuitry. +package flipflop diff --git a/pkg/flipflop/falling_edge.go b/pkg/flipflop/falling_edge.go new file mode 100644 index 00000000000..8e0315c0a20 --- /dev/null +++ b/pkg/flipflop/falling_edge.go @@ -0,0 +1,68 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package flipflop + +import ( + "time" +) + +type detector struct { + t time.Duration + worstCase time.Duration + + buf chan struct{} + out chan struct{} + quit chan struct{} +} + +// NewFallingEdge returns a new falling edge detector. +// bufferTime is the time to buffer, worstCase is buffertime*worstcase time to wait before writing +// to the output anyway. +func NewFallingEdge(bufferTime, worstCase time.Duration) (in chan<- struct{}, out <-chan struct{}, clean func()) { + d := &detector{ + t: bufferTime, + worstCase: worstCase, + buf: make(chan struct{}, 1), + out: make(chan struct{}), + quit: make(chan struct{}), + } + + go d.work() + + return d.buf, d.out, func() { close(d.quit) } +} + +func (d *detector) work() { + var waitWrite <-chan time.Time + var worstCase <-chan time.Time + for { + select { + case <-d.quit: + return + case <-d.buf: + // we have an item in the buffer, dont announce yet + waitWrite = time.After(d.t) + if worstCase == nil { + worstCase = time.After(d.worstCase) + } + case <-waitWrite: + select { + case d.out <- struct{}{}: + case <-d.quit: + return + } + worstCase = nil + waitWrite = nil + case <-worstCase: + select { + case d.out <- struct{}{}: + case <-d.quit: + return + } + worstCase = nil + waitWrite = nil + } + } +} diff --git a/pkg/flipflop/falling_edge_test.go b/pkg/flipflop/falling_edge_test.go new file mode 100644 index 00000000000..c7ca04d56ac --- /dev/null +++ b/pkg/flipflop/falling_edge_test.go @@ -0,0 +1,122 @@ +// Copyright 2021 The Swarm Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package flipflop_test + +import ( + "testing" + "time" + + "github.com/ethersphere/bee/pkg/flipflop" +) + +func TestFallingEdge(t *testing.T) { + t.Parallel() + t.Skip("github actions") + + ok := make(chan struct{}) + tt := 50 * time.Millisecond + worst := 5 * tt + in, c, cleanup := flipflop.NewFallingEdge(tt, worst) + defer cleanup() + go func() { + select { + case <-c: + close(ok) + return + case <-time.After(100 * time.Millisecond): + t.Error("timed out") + } + }() + + in <- struct{}{} + + select { + case <-ok: + case <-time.After(1 * time.Second): + t.Fatal("timed out") + } +} + +func TestFallingEdgeBuffer(t *testing.T) { + t.Parallel() + t.Skip("needs parameter tweaking on github actions") + + ok := make(chan struct{}) + tt := 150 * time.Millisecond + worst := 9 * tt + in, c, cleanup := flipflop.NewFallingEdge(tt, worst) + defer cleanup() + sleeps := 5 + wait := 50 * time.Millisecond + + start := time.Now() + online := make(chan struct{}) + go func() { + close(online) + select { + case <-c: + if time.Since(start) <= 450*time.Millisecond { + t.Errorf("wrote too early %v", time.Since(start)) + } + close(ok) + return + case <-time.After(1000 * time.Millisecond): + t.Error("timed out") + } + }() + + // wait for goroutine to be scheduled + <-online + + for i := 0; i < sleeps; i++ { + in <- struct{}{} + time.Sleep(wait) + } + select { + case <-ok: + case <-time.After(1 * time.Second): + t.Fatal("timed out") + } +} + +func TestFallingEdgeWorstCase(t *testing.T) { + t.Parallel() + t.Skip("github actions") + + ok := make(chan struct{}) + tt := 100 * time.Millisecond + worst := 5 * tt + in, c, cleanup := flipflop.NewFallingEdge(tt, worst) + defer cleanup() + sleeps := 9 + wait := 80 * time.Millisecond + + start := time.Now() + + go func() { + select { + case <-c: + if time.Since(start) >= 550*time.Millisecond { + t.Errorf("wrote too early %v", time.Since(start)) + } + + close(ok) + return + case <-time.After(1000 * time.Millisecond): + t.Error("timed out") + } + }() + go func() { + for i := 0; i < sleeps; i++ { + in <- struct{}{} + time.Sleep(wait) + } + }() + select { + case <-ok: + case <-time.After(1 * time.Second): + t.Fatal("timed out") + } +} diff --git a/pkg/localstore/localstore.go b/pkg/localstore/localstore.go index bee9a77b81b..30e41603610 100644 --- a/pkg/localstore/localstore.go +++ b/pkg/localstore/localstore.go @@ -60,6 +60,11 @@ var ( // Limit the number of goroutines created by Getters // that call updateGC function. Value 0 sets no limit. maxParallelUpdateGC = 1000 + + // values needed to adjust subscription trigger + // buffer time. + flipFlopBufferDuration = 150 * time.Millisecond + flipFlopWorstCaseDuration = 10 * time.Second ) const ( diff --git a/pkg/localstore/subscription_pull.go b/pkg/localstore/subscription_pull.go index 9138f1884dd..ad157fb1eee 100644 --- a/pkg/localstore/subscription_pull.go +++ b/pkg/localstore/subscription_pull.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/ethersphere/bee/pkg/flipflop" "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" @@ -43,16 +44,18 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) chunkDescriptors := make(chan storage.Descriptor) - trigger := make(chan struct{}, 1) - trigger <- struct{}{} + in, out, clean := flipflop.NewFallingEdge(flipFlopBufferDuration, flipFlopWorstCaseDuration) db.pullTriggersMu.Lock() if _, ok := db.pullTriggers[bin]; !ok { db.pullTriggers[bin] = make([]chan<- struct{}, 0) } - db.pullTriggers[bin] = append(db.pullTriggers[bin], trigger) + db.pullTriggers[bin] = append(db.pullTriggers[bin], in) db.pullTriggersMu.Unlock() + // send signal for the initial iteration + in <- struct{}{} + stopChan := make(chan struct{}) var stopChanOnce sync.Once @@ -62,6 +65,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) db.subscriptionsWG.Add(1) go func() { + defer clean() defer db.subscriptionsWG.Done() defer db.metrics.SubscribePullStop.Inc() // close the returned store.Descriptor channel at the end to @@ -79,7 +83,7 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) first := true // first iteration flag for SkipStartFromItem for { select { - case <-trigger: + case <-out: // iterate until: // - last index Item is reached // - subscription stop is called @@ -167,9 +171,9 @@ func (db *DB) SubscribePull(ctx context.Context, bin uint8, since, until uint64) defer db.pullTriggersMu.Unlock() for i, t := range db.pullTriggers[bin] { - if t == trigger { + if t == in { db.pullTriggers[bin] = append(db.pullTriggers[bin][:i], db.pullTriggers[bin][i+1:]...) - return + break } } } From a6a7ca9ade7d4677e18fa2cac8f23325c236c662 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 20 Oct 2022 13:48:22 +0300 Subject: [PATCH 17/24] fix: missing metric --- pkg/puller/puller.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 189db06fe67..56d0b5a5252 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -274,8 +274,8 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin return } if syncErr != nil { - p.logger.Error(err, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top) - continue + p.metrics.HistWorkerErrCounter.Inc() + p.logger.Error(syncErr, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top) } loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer) @@ -325,9 +325,8 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin if syncErr != nil { p.metrics.LiveWorkerErrCounter.Inc() - p.logger.Error(err, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top) + p.logger.Error(syncErr, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top) sleep = true - continue } loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer) From 0f53bfc011534d7ed2c01aab008bfd649e0270d9 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 20 Oct 2022 19:38:20 +0300 Subject: [PATCH 18/24] fix: context --- pkg/puller/puller.go | 57 +++++++++++++++++++++----------------------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 56d0b5a5252..910ff2b3d63 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -47,8 +47,9 @@ type Puller struct { syncPeers []map[string]*syncPeer // index is bin, map key is peer address syncPeersMtx sync.Mutex - quit chan struct{} - wg sync.WaitGroup + cancel func() + + wg sync.WaitGroup bins uint8 // how many bins do we support } @@ -69,7 +70,6 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState metrics: newMetrics(), logger: logger.WithName(loggerName).Register(), syncPeers: make([]map[string]*syncPeer, bins), - quit: make(chan struct{}), bins: bins, } @@ -77,34 +77,32 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState for i := uint8(0); i < bins; i++ { p.syncPeers[i] = make(map[string]*syncPeer) } + + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + p.wg.Add(1) - go p.manage(warmupTime) + go p.manage(ctx, warmupTime) return p } -func (p *Puller) manage(warmupTime time.Duration) { +func (p *Puller) manage(ctx context.Context, warmupTime time.Duration) { defer p.wg.Done() select { case <-time.After(warmupTime): - case <-p.quit: + case <-ctx.Done(): return } c, unsubscribe := p.topology.SubscribeTopologyChange() defer unsubscribe() - ctx, cancel := context.WithCancel(context.Background()) - go func() { - <-p.quit - cancel() - }() - p.logger.Info("puller: warmup period complete, worker starting.") for { select { - case <-p.quit: + case <-ctx.Done(): return case <-c: @@ -163,7 +161,7 @@ func (p *Puller) disconnectPeer(addr swarm.Address, po uint8) { // recalcPeers starts or stops syncing process for peers per bin depending on the current sync radius. // Must be called under lock. -func (p *Puller) recalcPeers(ctx context.Context, syncRadius uint8) (dontSync bool) { +func (p *Puller) recalcPeers(ctx context.Context, syncRadius uint8) { loggerV2 := p.logger for bin, peers := range p.syncPeers { @@ -181,8 +179,6 @@ func (p *Puller) recalcPeers(ctx context.Context, syncRadius uint8) (dontSync bo } } } - - return false } func (p *Puller) syncPeer(ctx context.Context, peer *syncPeer, syncRadius uint8) error { @@ -265,7 +261,12 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin p.logger.Debug("histSyncWorker syncing finished", "bin", bin, "cursor", cur) return } - top, _, syncErr := p.syncer.SyncInterval(ctx, peer, bin, s, cur) + top, _, err := p.syncer.SyncInterval(ctx, peer, bin, s, cur) + if err != nil { + p.metrics.HistWorkerErrCounter.Inc() + p.logger.Error(err, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top) + sleep = true + } err = p.addPeerInterval(peer, bin, s, top) if err != nil { @@ -273,10 +274,6 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin p.logger.Error(err, "histSyncWorker could not persist interval for peer, quitting...", "peer_address", peer) return } - if syncErr != nil { - p.metrics.HistWorkerErrCounter.Inc() - p.logger.Error(syncErr, "histSyncWorker syncing interval failed", "peer_address", peer, "bin", bin, "cursor", cur, "start", s, "topmost", top) - } loggerV2.Debug("histSyncWorker pulled", "bin", bin, "start", s, "topmost", top, "peer_address", peer) } @@ -311,24 +308,24 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin default: } - top, _, syncErr := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor) + top, _, err := p.syncer.SyncInterval(ctx, peer, bin, from, pullsync.MaxCursor) + if err != nil { + p.metrics.LiveWorkerErrCounter.Inc() + p.logger.Error(err, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top) + sleep = true + } + if top == math.MaxUint64 { p.metrics.MaxUintErrCounter.Inc() return } - err := p.addPeerInterval(peer, bin, from, top) + err = p.addPeerInterval(peer, bin, from, top) if err != nil { p.metrics.LiveWorkerErrCounter.Inc() p.logger.Error(err, "liveSyncWorker exit on add peer interval", "peer_address", peer, "bin", bin, "from", from, "error", err) return } - if syncErr != nil { - p.metrics.LiveWorkerErrCounter.Inc() - p.logger.Error(syncErr, "liveSyncWorker sync error", "peer_address", peer, "bin", bin, "from", from, "topmost", top) - sleep = true - } - loggerV2.Debug("liveSyncWorker pulled bin", "bin", bin, "from", from, "topmost", top, "peer_address", peer) from = top + 1 @@ -337,7 +334,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin func (p *Puller) Close() error { p.logger.Info("puller shutting down") - close(p.quit) + p.cancel() cc := make(chan struct{}) go func() { defer close(cc) From c3fb373482f978fbf3e6ec4413a3e80ba334bb22 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Thu, 20 Oct 2022 20:43:03 +0300 Subject: [PATCH 19/24] chore: tests --- pkg/node/node.go | 2 +- pkg/puller/puller.go | 14 +++++----- pkg/puller/puller_test.go | 49 +++++++++++++++++++++++++++++++---- pkg/pullsync/mock/pullsync.go | 12 +++++++++ 4 files changed, 65 insertions(+), 12 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 4d07b12965a..b35183fdcf0 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -931,7 +931,7 @@ func NewBee(interrupt chan struct{}, sysInterrupt chan os.Signal, addr string, p var pullerService *puller.Puller if o.FullNodeMode && !o.BootnodeMode { - pullerService = puller.New(stateStore, kad, batchStore, pullSyncProtocol, logger, puller.Options{}, warmupTime) + pullerService = puller.New(stateStore, kad, batchStore, pullSyncProtocol, logger, puller.Options{}, warmupTime, puller.DefaultSyncSleepDur) b.pullerCloser = pullerService } diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 910ff2b3d63..171b70bd6a2 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -29,7 +29,7 @@ const loggerName = "puller" var errCursorsLength = errors.New("cursors length mismatch") -const syncSleepDur = time.Minute * 5 +const DefaultSyncSleepDur = time.Minute * 5 type Options struct { Bins uint8 @@ -51,10 +51,12 @@ type Puller struct { wg sync.WaitGroup + syncSleepDur time.Duration + bins uint8 // how many bins do we support } -func New(stateStore storage.StateStorer, topology topology.Driver, reserveState postage.ReserveStateGetter, pullSync pullsync.Interface, logger log.Logger, o Options, warmupTime time.Duration) *Puller { +func New(stateStore storage.StateStorer, topology topology.Driver, reserveState postage.ReserveStateGetter, pullSync pullsync.Interface, logger log.Logger, o Options, warmupTime time.Duration, syncSleepDur time.Duration) *Puller { var ( bins uint8 = swarm.MaxBins ) @@ -70,8 +72,8 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState metrics: newMetrics(), logger: logger.WithName(loggerName).Register(), syncPeers: make([]map[string]*syncPeer, bins), - - bins: bins, + syncSleepDur: syncSleepDur, + bins: bins, } for i := uint8(0); i < bins; i++ { @@ -238,7 +240,7 @@ func (p *Puller) histSyncWorker(ctx context.Context, peer swarm.Address, bin uin case <-ctx.Done(): loggerV2.Debug("histSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) return - case <-time.After(syncSleepDur): + case <-time.After(p.syncSleepDur): } sleep = false } @@ -296,7 +298,7 @@ func (p *Puller) liveSyncWorker(ctx context.Context, peer swarm.Address, bin uin case <-ctx.Done(): loggerV2.Debug("liveSyncWorker context cancelled", "peer_address", peer, "bin", bin, "cursor", cur) return - case <-time.After(syncSleepDur): + case <-time.After(p.syncSleepDur): } sleep = false } diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 8a73de06fd8..4d506dc570e 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -447,6 +447,44 @@ func TestDepthChange(t *testing.T) { } } +// TestContinueSyncing adds a single peer with PO 0 to hist and live sync only a peer +// to test that when SyncInterval returns an error, the syncing does not terminate. +func TestContinueSyncing(t *testing.T) { + t.Parallel() + + var ( + addr = test.RandomAddress() + ) + + puller, _, kad, pullsync := newPuller(opts{ + kad: []mockk.Option{ + mockk.WithEachPeerRevCalls(mockk.AddrTuple{Addr: addr, PO: 0}), + mockk.WithDepth(0), + }, + pullSync: []mockps.Option{ + mockps.WithCursors([]uint64{1}), + mockps.WithSyncError(errors.New("sync error"))}, + bins: 1, + syncRadius: 0, + syncSleepDur: time.Millisecond * 10, + }) + defer puller.Close() + defer pullsync.Close() + + time.Sleep(100 * time.Millisecond) + kad.Trigger() + time.Sleep(100 * time.Millisecond) + + calls := pullsync.LiveSyncCalls(addr) + + // expected calls should ideally be exactly 10, + // but we allow some time for the goroutines to run + // by reducing the minimum expected calls to 5 + if len(calls) < 5 || len(calls) > 10 { + t.Fatalf("unexpected amount of calls, got %d", len(calls)) + } +} + func checkIntervals(t *testing.T, s storage.StateStorer, addr swarm.Address, expInterval string, bin uint8) { t.Helper() key := puller.PeerIntervalKey(addr, bin) @@ -618,10 +656,11 @@ func waitLiveSyncCalledTimes(t *testing.T, ps *mockps.PullSyncMock, addr swarm.A } type opts struct { - pullSync []mockps.Option - kad []mockk.Option - bins uint8 - syncRadius uint8 + pullSync []mockps.Option + kad []mockk.Option + bins uint8 + syncRadius uint8 + syncSleepDur time.Duration } func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *mockps.PullSyncMock) { @@ -635,7 +674,7 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc o := puller.Options{ Bins: ops.bins, } - return puller.New(s, kad, rs, ps, logger, o, 0), s, kad, ps + return puller.New(s, kad, rs, ps, logger, o, 0, ops.syncSleepDur), s, kad, ps } type c struct { diff --git a/pkg/pullsync/mock/pullsync.go b/pkg/pullsync/mock/pullsync.go index 414edb9156b..2ddc56331a6 100644 --- a/pkg/pullsync/mock/pullsync.go +++ b/pkg/pullsync/mock/pullsync.go @@ -16,6 +16,12 @@ import ( var _ pullsync.Interface = (*PullSyncMock)(nil) +func WithSyncError(err error) Option { + return optionFunc(func(p *PullSyncMock) { + p.syncErr = err + }) +} + func WithCursors(v []uint64) Option { return optionFunc(func(p *PullSyncMock) { p.cursors = v @@ -80,6 +86,7 @@ func NewReply(bin uint8, from, top uint64, block bool) SyncReply { type PullSyncMock struct { mtx sync.Mutex syncCalls []SyncCall + syncErr error cursors []uint64 getCursorsPeers []swarm.Address autoReply bool @@ -107,6 +114,7 @@ func NewPullSync(opts ...Option) *PullSyncMock { } func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin uint8, from, to uint64) (topmost uint64, ruid uint32, err error) { + isLive := to == math.MaxUint64 call := SyncCall{ @@ -120,6 +128,10 @@ func (p *PullSyncMock) SyncInterval(ctx context.Context, peer swarm.Address, bin p.syncCalls = append(p.syncCalls, call) p.mtx.Unlock() + if p.syncErr != nil { + return 0, 0, p.syncErr + } + if isLive && p.lateReply { p.lateCond.L.Lock() for !p.lateChange { From 239a691fada285417c1a8a2285ba06065f1be4cb Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 21 Oct 2022 01:42:36 +0300 Subject: [PATCH 20/24] fix: remove unncessary disconnect --- pkg/puller/puller.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 171b70bd6a2..c59c0afbc43 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -129,11 +129,7 @@ func (p *Puller) manage(ctx context.Context, warmupTime time.Duration) { } // remove from disconnected list as the peer is still connected delete(peersDisconnected, addr.ByteString()) - } else { - // outside of neighborhood, prune peer - p.disconnectPeer(addr, po) } - return false, false, nil }, topology.Filter{Reachable: true}) From 1061c6c82c7a7bf4bb417752eeb799b66ebe3733 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Fri, 21 Oct 2022 01:47:58 +0300 Subject: [PATCH 21/24] fix: timer --- pkg/puller/puller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index c59c0afbc43..8930cd54fe1 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -29,7 +29,7 @@ const loggerName = "puller" var errCursorsLength = errors.New("cursors length mismatch") -const DefaultSyncSleepDur = time.Minute * 5 +const DefaultSyncSleepDur = time.Second * 5 type Options struct { Bins uint8 From 6abf7a476de600b7b2cab28a2e14b30b5b42f5e1 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Sun, 23 Oct 2022 22:16:23 +0300 Subject: [PATCH 22/24] chore: retry sleep --- pkg/puller/puller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 8930cd54fe1..1fe57f3b844 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -29,7 +29,7 @@ const loggerName = "puller" var errCursorsLength = errors.New("cursors length mismatch") -const DefaultSyncSleepDur = time.Second * 5 +const DefaultSyncSleepDur = time.Minute type Options struct { Bins uint8 From a7fe7a994673150dc1d35005ba8f3cc6af07066c Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Mon, 24 Oct 2022 20:15:20 +0300 Subject: [PATCH 23/24] chore: peer gone test --- pkg/puller/puller_test.go | 47 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 4d506dc570e..90a49fb782c 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -485,6 +485,53 @@ func TestContinueSyncing(t *testing.T) { } } +func TestPeerGone(t *testing.T) { + t.Parallel() + + var ( + addr = test.RandomAddress() + ) + + p, _, kad, pullsync := newPuller(opts{ + kad: []mockk.Option{ + mockk.WithEachPeerRevCalls(mockk.AddrTuple{Addr: addr, PO: 0}), + mockk.WithDepth(0), + }, + pullSync: []mockps.Option{ + mockps.WithCursors([]uint64{1}), + }, + bins: 1, + syncRadius: 0, + syncSleepDur: time.Millisecond * 10, + }) + defer p.Close() + defer pullsync.Close() + + time.Sleep(100 * time.Millisecond) + kad.Trigger() + time.Sleep(100 * time.Millisecond) + + beforeCalls := pullsync.LiveSyncCalls(addr) + + if len(beforeCalls) != 1 { + t.Fatalf("unexpected amount of calls, got %d, want 1", len(beforeCalls)) + } + + kad.ResetPeers() + kad.Trigger() + time.Sleep(100 * time.Millisecond) + + afterCalls := pullsync.LiveSyncCalls(addr) + + if len(beforeCalls) != len(afterCalls) { + t.Fatalf("unexpected new calls to sync interval, expected 0, got %d", len(afterCalls)-len(beforeCalls)) + } + + if puller.IsSyncing(p, addr) { + t.Fatalf("peer is syncing but shouldnt") + } +} + func checkIntervals(t *testing.T, s storage.StateStorer, addr swarm.Address, expInterval string, bin uint8) { t.Helper() key := puller.PeerIntervalKey(addr, bin) From ae7a09092d85d98054e32e027bf86071271aa388 Mon Sep 17 00:00:00 2001 From: istae <14264581+istae@users.noreply.github.com> Date: Tue, 25 Oct 2022 11:53:48 +0300 Subject: [PATCH 24/24] chore: puller sync sleep dur options --- pkg/node/node.go | 2 +- pkg/puller/puller.go | 7 ++++--- pkg/puller/puller_test.go | 5 +++-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index b35183fdcf0..2ea4dc6dc80 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -931,7 +931,7 @@ func NewBee(interrupt chan struct{}, sysInterrupt chan os.Signal, addr string, p var pullerService *puller.Puller if o.FullNodeMode && !o.BootnodeMode { - pullerService = puller.New(stateStore, kad, batchStore, pullSyncProtocol, logger, puller.Options{}, warmupTime, puller.DefaultSyncSleepDur) + pullerService = puller.New(stateStore, kad, batchStore, pullSyncProtocol, logger, puller.Options{SyncSleepDur: puller.DefaultSyncSleepDur}, warmupTime) b.pullerCloser = pullerService } diff --git a/pkg/puller/puller.go b/pkg/puller/puller.go index 1fe57f3b844..8409e36a548 100644 --- a/pkg/puller/puller.go +++ b/pkg/puller/puller.go @@ -32,7 +32,8 @@ var errCursorsLength = errors.New("cursors length mismatch") const DefaultSyncSleepDur = time.Minute type Options struct { - Bins uint8 + Bins uint8 + SyncSleepDur time.Duration } type Puller struct { @@ -56,7 +57,7 @@ type Puller struct { bins uint8 // how many bins do we support } -func New(stateStore storage.StateStorer, topology topology.Driver, reserveState postage.ReserveStateGetter, pullSync pullsync.Interface, logger log.Logger, o Options, warmupTime time.Duration, syncSleepDur time.Duration) *Puller { +func New(stateStore storage.StateStorer, topology topology.Driver, reserveState postage.ReserveStateGetter, pullSync pullsync.Interface, logger log.Logger, o Options, warmupTime time.Duration) *Puller { var ( bins uint8 = swarm.MaxBins ) @@ -72,7 +73,7 @@ func New(stateStore storage.StateStorer, topology topology.Driver, reserveState metrics: newMetrics(), logger: logger.WithName(loggerName).Register(), syncPeers: make([]map[string]*syncPeer, bins), - syncSleepDur: syncSleepDur, + syncSleepDur: o.SyncSleepDur, bins: bins, } diff --git a/pkg/puller/puller_test.go b/pkg/puller/puller_test.go index 90a49fb782c..989d22f04b8 100644 --- a/pkg/puller/puller_test.go +++ b/pkg/puller/puller_test.go @@ -719,9 +719,10 @@ func newPuller(ops opts) (*puller.Puller, storage.StateStorer, *mockk.Mock, *moc rs := &reserveStateGetter{rs: postage.ReserveState{StorageRadius: ops.syncRadius}} o := puller.Options{ - Bins: ops.bins, + Bins: ops.bins, + SyncSleepDur: ops.syncSleepDur, } - return puller.New(s, kad, rs, ps, logger, o, 0, ops.syncSleepDur), s, kad, ps + return puller.New(s, kad, rs, ps, logger, o, 0), s, kad, ps } type c struct {