Skip to content
This repository has been archived by the owner on Apr 21, 2022. It is now read-only.

Close connections where streams haven't been opened since some time #77

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
81 changes: 68 additions & 13 deletions connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ var SilencePeriod = 10 * time.Second

var log = logging.Logger("connmgr")

var maxStreamOpenDuration = 10 * time.Minute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

name... this is poorly named.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.


// BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the
// high watermark. New connections are given a grace period before they're subject
// to trimming. Trims are automatically run on demand, only if the time from the
Expand Down Expand Up @@ -84,7 +86,7 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
temp: true,
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
conns: make(map[network.Conn]*connInfo),
}
s.peers[p] = pi
return pi
Expand Down Expand Up @@ -193,6 +195,11 @@ func (cm *BasicConnMgr) IsProtected(id peer.ID, tag string) (protected bool) {
return protected
}

type connInfo struct {
startTime time.Time
lastStreamOpen time.Time
}

// peerInfo stores metadata for a given peer.
type peerInfo struct {
id peer.ID
Expand All @@ -202,7 +209,7 @@ type peerInfo struct {
value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections

conns map[network.Conn]time.Time // start time of each connection
conns map[network.Conn]*connInfo // start time and last stream open time of each connection.

firstSeen time.Time // timestamp when we began tracking this peer.
}
Expand Down Expand Up @@ -359,9 +366,31 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {

target := ncandidates - cm.cfg.lowWater

// slightly overallocate because we may have more than one conns per peer
selected := make([]network.Conn, 0, target+10)
// overallocate because we may have more than one conns per peer
selected := make([]network.Conn, 0, 2*target)
seen := make(map[network.Conn]struct{})

// first select connections that haven't seen a stream since some time.
for _, inf := range candidates {
if target <= 0 {
break
}

// lock this to protect from concurrent modifications from connect/disconnect events
s := cm.segments.get(inf.id)
s.Lock()

for c, info := range inf.conns {
if !info.lastStreamOpen.IsZero() && time.Since(info.lastStreamOpen) > maxStreamOpenDuration {
selected = append(selected, c)
target--
seen[c] = struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should only do this if there are no active streams.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And because it might be expensive to count on demand, we can keep a running count using the StreamOpened/StreamClosed events.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vyzo

The current heuristic is:-
"Cleanup connections that vane't seen a stream since 10 Minutes (but could have open streams older than that)".

Are you saying that we should clean-up streams that:
"Don't have any open streams at all inspite of being older than 10 minutes" ?

Any reason to prefer the latter ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That heuristic is incorrect, it will break applications with long lived stream.
Imagine a pubsub client, who simply connects and opens the pubsub streams and does nothing else afterwards.
If we reap the connection because there are no new streams, we just broke the app.

So yeah, we should only close connections if they have been idle for 10min without any streams present.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense.

}
}
s.Unlock()
}

// now select remaining connections if we still haven't hit our target
for _, inf := range candidates {
if target <= 0 {
break
Expand All @@ -377,10 +406,12 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
delete(s.peers, inf.id)
} else {
for c := range inf.conns {
selected = append(selected, c)
if _, ok := seen[c]; !ok {
selected = append(selected, c)
target--
}
}
}
target -= len(inf.conns)
s.Unlock()
}

Expand Down Expand Up @@ -412,8 +443,8 @@ func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
for t, v := range pi.decaying {
out.Tags[t.name] = v.Value
}
for c, t := range pi.conns {
out.Conns[c.RemoteMultiaddr().String()] = t
for c, connInfo := range pi.conns {
out.Conns[c.RemoteMultiaddr().String()] = connInfo.startTime
}

return out
Expand Down Expand Up @@ -528,7 +559,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
firstSeen: time.Now(),
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
conns: make(map[network.Conn]*connInfo),
}
s.peers[id] = pinfo
} else if pinfo.temp {
Expand All @@ -545,7 +576,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
return
}

pinfo.conns[c] = time.Now()
pinfo.conns[c] = &connInfo{startTime: time.Now()}
atomic.AddInt32(&cm.connCount, 1)
}

Expand Down Expand Up @@ -578,14 +609,38 @@ func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) {
atomic.AddInt32(&cm.connCount, -1)
}

// OpenedStream is called by notifiers to inform that a new libp2p stream has been opened on a connection.
// The notifee updates the BasicConnMgr accordingly to update the time we last saw a stream on the connection
// We then use this information when deciding which connections to trim.
func (nn *cmNotifee) OpenedStream(_ network.Network, stream network.Stream) {
cm := nn.cm()

p := stream.Conn().RemotePeer()
s := cm.segments.get(p)
s.Lock()
defer s.Unlock()

cinf, ok := s.peers[p]
if !ok {
log.Error("received stream open notification for peer we are not tracking: ", p)
return
}

c := stream.Conn()
_, ok = cinf.conns[c]
if !ok {
log.Error("received stream open notification for conn we are not tracking: ", p)
return
}

cinf.conns[c].lastStreamOpen = time.Now()
}

// Listen is no-op in this implementation.
func (nn *cmNotifee) Listen(n network.Network, addr ma.Multiaddr) {}

// ListenClose is no-op in this implementation.
func (nn *cmNotifee) ListenClose(n network.Network, addr ma.Multiaddr) {}

// OpenedStream is no-op in this implementation.
func (nn *cmNotifee) OpenedStream(network.Network, network.Stream) {}

// ClosedStream is no-op in this implementation.
func (nn *cmNotifee) ClosedStream(network.Network, network.Stream) {}
52 changes: 52 additions & 0 deletions connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ func randConn(t testing.TB, discNotify func(network.Network, network.Conn)) netw
return &tconn{peer: pid, disconnectNotify: discNotify}
}

type tStream struct {
network.Stream
conn network.Conn
}

func (s *tStream) Conn() network.Conn {
return s.conn
}

func randStream(t testing.TB, c network.Conn) network.Stream {
return &tStream{conn: c}
}

// Make sure multiple trim calls block.
func TestTrimBlocks(t *testing.T) {
cm := NewConnManager(200, 300, 0)
Expand Down Expand Up @@ -124,6 +137,45 @@ func TestTrimJoin(t *testing.T) {
wg.Wait()
}

func TestCloseStreamNotOpen(t *testing.T) {
copy := maxStreamOpenDuration
maxStreamOpenDuration = 100 * time.Millisecond
defer func() {
maxStreamOpenDuration = copy
}()

cm := NewConnManager(5, 8, 0)
not := cm.Notifee()

var conns []network.Conn
for i := 0; i < 8; i++ {
rc := randConn(t, nil)
conns = append(conns, rc)
not.Connected(nil, rc)
}

for i, c := range conns {
if i%3 == 0 {
not.OpenedStream(nil, randStream(t, c))
}
}

time.Sleep(1 * time.Second)
cm.TrimOpenConns(context.Background())

for i, c := range conns {
if i%3 == 0 {
if !c.(*tconn).closed {
t.Fatal("these should be closed")
}
} else {
if c.(*tconn).closed {
t.Fatal("these should NOT be closed")
}
}
}
}

func TestConnTrimming(t *testing.T) {
cm := NewConnManager(200, 300, 0)
not := cm.Notifee()
Expand Down