Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed May 7, 2024
1 parent 25ccbf2 commit e9b2300
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 20 deletions.
34 changes: 18 additions & 16 deletions p2p/net/swarm/connectedness_event_emitter.go
Expand Up @@ -66,13 +66,14 @@ func (c *connectednessEventEmitter) RemoveConn(p peer.ID) {
}

c.removeConnsMx.Lock()
// This queue is not unbounded since we block in the AddConn method
// So we are adding connections to the swarm only at a rate
// the subscriber for our peer connectedness changed events can consume them.
// If a lot of open connections are closed at once, increasing the disconnected
// event notification rate, the rate of adding connections to the swarm would
// proportionately reduce, which would eventually reduce the length of this slice.
// This queue is roughly bounded by the total number of added connections we
// have. If consumers of connectedness events are slow, we apply
// backpressure to AddConn operations.
//
// We purposefully don't block/backpressure here to avoid deadlocks, since it's
// reasonable for a consumer of the event to want to remove a connection.
c.removeConns = append(c.removeConns, p)

c.removeConnsMx.Unlock()

select {
Expand Down Expand Up @@ -111,6 +112,12 @@ func (c *connectednessEventEmitter) runEmitter() {
}
}

// notifyPeer sends the peer connectedness event using the emitter.
// Use forceNotConnectedEvent = true to send a NotConnected event even if
// no Connected event was sent for this peer.
// In case a peer is disconnected before we sent the Connected event, we still
// send the Disconnected event because a connection to the peer can be observed
// in such cases.
func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent bool) {
oldState := c.lastEvent[p]
c.lastEvent[p] = c.connectedness(p)
Expand All @@ -128,16 +135,11 @@ func (c *connectednessEventEmitter) notifyPeer(p peer.ID, forceNotConnectedEvent
func (c *connectednessEventEmitter) sendConnRemovedNotifications() {
c.removeConnsMx.Lock()
defer c.removeConnsMx.Unlock()
for {
if len(c.removeConns) == 0 {
return
}
p := c.removeConns[0]
c.removeConns[0] = ""
c.removeConns = c.removeConns[1:]

c.removeConnsMx.Unlock()
c.removeConnsMx.Lock()
removeConns := c.removeConns
c.removeConns = nil
c.removeConnsMx.Unlock()
for _, p := range removeConns {
c.notifyPeer(p, false)
c.removeConnsMx.Lock()
}
}
2 changes: 1 addition & 1 deletion p2p/net/swarm/dial_worker.go
Expand Up @@ -340,7 +340,7 @@ loop:
ad.expectedTCPUpgradeTime = time.Time{}
if res.Conn != nil {
// we got a connection, add it to the swarm
conn, err := w.s.addConn(ad.ctx, res.Conn, network.DirOutbound)
conn, err := w.s.addConn(res.Conn, network.DirOutbound)
if err != nil {
// oops no, we failed to add it to the swarm
res.Conn.Close()
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm.go
Expand Up @@ -338,7 +338,7 @@ func (s *Swarm) close() {
wg.Wait()
}

func (s *Swarm) addConn(ctx context.Context, tc transport.CapableConn, dir network.Direction) (*Conn, error) {
func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn, error) {
var (
p = tc.RemotePeer()
addr = tc.RemoteMultiaddr()
Expand Down
2 changes: 0 additions & 2 deletions p2p/net/swarm/swarm_event_test.go
Expand Up @@ -2,7 +2,6 @@ package swarm_test

import (
"context"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -308,5 +307,4 @@ func TestConnectednessEventDeadlockWithDial(t *testing.T) {

close(done)
subWG.Wait()
fmt.Println("swarm closed")
}

0 comments on commit e9b2300

Please sign in to comment.