New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds exponential backoff to re-spawing new streams for supposedly dead peers #483
Changes from 11 commits
c3a6760
5e8ec29
7c58f7a
42e310b
90cdd55
b3f58bc
a9f4edf
a77d435
6e4b2f8
2761b98
6401d8b
4c94e5f
e260291
c74ae78
7f815f0
6ebc292
8b64966
c00510e
e9d42fa
eede9ba
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
cover.out | ||
prof.out | ||
go-floodsub.test | ||
|
||
.idea/ |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
package pubsub | ||
|
||
import ( | ||
"context" | ||
"math/rand" | ||
"sync" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
const ( | ||
MinBackoffDelay = 100 * time.Millisecond | ||
MaxBackoffDelay = 10 * time.Second | ||
TimeToLive = 10 * time.Minute | ||
BackoffCleanupInterval = 1 * time.Minute | ||
BackoffMultiplier = 2 | ||
MaxBackoffJitterCoff = 100 | ||
) | ||
|
||
type backoffHistory struct { | ||
duration time.Duration | ||
lastTried time.Time | ||
} | ||
|
||
type backoff struct { | ||
mu sync.Mutex | ||
info map[peer.ID]*backoffHistory | ||
ct int // size threshold that kicks off the cleaner | ||
ci time.Duration // cleanup intervals | ||
} | ||
|
||
func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration) *backoff { | ||
b := &backoff{ | ||
mu: sync.Mutex{}, | ||
ct: sizeThreshold, | ||
ci: cleanupInterval, | ||
info: make(map[peer.ID]*backoffHistory), | ||
} | ||
|
||
rand.Seed(time.Now().UnixNano()) // used for jitter | ||
go b.cleanupLoop(ctx) | ||
|
||
return b | ||
} | ||
|
||
func (b *backoff) updateAndGet(id peer.ID) time.Duration { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
h, ok := b.info[id] | ||
if !ok || time.Since(h.lastTried) > TimeToLive { | ||
// first request goes immediately. | ||
h = &backoffHistory{ | ||
duration: time.Duration(0), | ||
} | ||
} else if h.duration < MinBackoffDelay { | ||
h.duration = MinBackoffDelay | ||
} else if h.duration < MaxBackoffDelay { | ||
jitter := rand.Intn(MaxBackoffJitterCoff) | ||
h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter) * time.Millisecond | ||
if h.duration > MaxBackoffDelay || h.duration < 0 { | ||
h.duration = MaxBackoffDelay | ||
} | ||
} | ||
|
||
h.lastTried = time.Now() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's get the time after checking the max attempts, will avoid the gettimeofday call in that case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please read my reply to the below comment as this part has got changed. |
||
b.info[id] = h | ||
|
||
return h.duration | ||
} | ||
|
||
func (b *backoff) cleanup() { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
for id, h := range b.info { | ||
if time.Since(h.lastTried) > TimeToLive { | ||
delete(b.info, id) | ||
} | ||
} | ||
} | ||
|
||
func (b *backoff) cleanupLoop(ctx context.Context) { | ||
vyzo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return // pubsub shutting down | ||
case <-time.Tick(b.ci): | ||
b.cleanup() | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
package pubsub | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math" | ||
"testing" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
func TestBackoff_Update(t *testing.T){ | ||
id1 := peer.ID("peer-1") | ||
id2 := peer.ID("peer-2") | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
size := 10 | ||
cleanupInterval := 5 * time.Second | ||
|
||
b := newBackoff(ctx, size, cleanupInterval) | ||
|
||
if len(b.info) > 0 { | ||
t.Fatal("non-empty info map for backoff") | ||
} | ||
|
||
if d := b.updateAndGet(id1); d != time.Duration(0) { | ||
t.Fatalf("invalid initialization: %v", d) | ||
} | ||
if d := b.updateAndGet(id2); d != time.Duration(0) { | ||
t.Fatalf("invalid initialization: %v", d) | ||
} | ||
|
||
for i := 0; i < 10; i++{ | ||
got := b.updateAndGet(id1) | ||
|
||
expected := time.Duration(math.Pow(BackoffMultiplier, float64(i)) * | ||
float64(MinBackoffDelay + MaxBackoffJitterCoff * time.Millisecond)) | ||
if expected > MaxBackoffDelay { | ||
expected = MaxBackoffDelay | ||
} | ||
|
||
if expected < got { // considering jitter, expected backoff must always be greater than or equal to actual. | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", expected, got) | ||
} | ||
} | ||
|
||
got := b.updateAndGet(id2) | ||
if got != MinBackoffDelay { | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", MinBackoffDelay, got) | ||
} | ||
|
||
// sets last tried of id2 to long ago that it resets back upon next try. | ||
b.info[id2].lastTried = time.Now().Add(-TimeToLive) | ||
got = b.updateAndGet(id2) | ||
if got != time.Duration(0) { | ||
t.Fatalf("invalid ttl expiration, expected: %v, got: %v", time.Duration(0), got) | ||
} | ||
|
||
if len(b.info) != 2 { | ||
t.Fatalf("info map size mismatch, expected: %d, got: %d", 2, len(b.info)) | ||
} | ||
} | ||
|
||
func TestBackoff_Clean(t *testing.T){ | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
size := 10 | ||
cleanupInterval := 2 * time.Second | ||
b := newBackoff(ctx, size, cleanupInterval) | ||
|
||
for i := 0; i < size; i++{ | ||
id := peer.ID(fmt.Sprintf("peer-%d", i)) | ||
b.updateAndGet(id) | ||
b.info[id].lastTried = time.Now().Add(-TimeToLive) // enforces expiry | ||
} | ||
|
||
if len(b.info) != size { | ||
t.Fatalf("info map size mismatch, expected: %d, got: %d", size, len(b.info)) | ||
} | ||
|
||
// waits for a cleanup loop to kick-in | ||
time.Sleep(2 * cleanupInterval) | ||
|
||
// next update should trigger cleanup | ||
got := b.updateAndGet(peer.ID("some-new-peer")) | ||
if got != time.Duration(0) { | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", time.Duration(0), got) | ||
} | ||
|
||
// except "some-new-peer" every other records must be cleaned up | ||
if len(b.info) != 1 { | ||
t.Fatalf("info map size mismatch, expected: %d, got: %d", 1, len(b.info)) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ import ( | |
"bufio" | ||
"context" | ||
"io" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
|
@@ -121,6 +122,16 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan | |
} | ||
} | ||
|
||
func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, outgoing <-chan *RPC) { | ||
delay := p.deadPeerBackoff.updateAndGet(pid) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need to add a failure more if we have backed off too much and simply give up; say we try up to 10 times and then How does that sound? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe 10 is even too much, 3-4 attempts should be enough. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
select { | ||
case <-time.After(delay): | ||
p.handleNewPeer(ctx, pid, outgoing) | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
|
||
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) { | ||
pid := s.Conn().RemotePeer() | ||
r := protoio.NewDelimitedReader(s, p.maxMessageSize) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's write this if/else sequence with a switch, will be nicer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
7f815f0