Skip to content
This repository has been archived by the owner on May 26, 2022. It is now read-only.

Commit

Permalink
Support for Hole punching (#233)
Browse files Browse the repository at this point in the history
* support for forced direct connections.
  • Loading branch information
aarshkshah1992 committed Feb 18, 2021
1 parent e6f85ac commit f82c425
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 23 deletions.
6 changes: 3 additions & 3 deletions dial_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func (ad *activeDial) start(ctx context.Context) {
ad.cancel()
}

func (ds *DialSync) getActiveDial(p peer.ID) *activeDial {
func (ds *DialSync) getActiveDial(ctx context.Context, p peer.ID) *activeDial {
ds.dialsLk.Lock()
defer ds.dialsLk.Unlock()

actd, ok := ds.dials[p]
if !ok {
adctx, cancel := context.WithCancel(context.Background())
adctx, cancel := context.WithCancel(ctx)
actd = &activeDial{
id: p,
cancel: cancel,
Expand All @@ -123,7 +123,7 @@ func (ds *DialSync) getActiveDial(p peer.ID) *activeDial {
// DialLock initiates a dial to the given peer if there are none in progress
// then waits for the dial to that peer to complete.
func (ds *DialSync) DialLock(ctx context.Context, p peer.ID) (*Conn, error) {
return ds.getActiveDial(p).wait(ctx)
return ds.getActiveDial(ctx, p).wait(ctx)
}

// CancelDial cancels all in-progress dials to the given peer.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-addr-util v0.0.2
github.com/libp2p/go-conn-security-multistream v0.2.1
github.com/libp2p/go-libp2p-core v0.8.2
github.com/libp2p/go-libp2p-core v0.8.3
github.com/libp2p/go-libp2p-loggables v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-quic-transport v0.10.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ github.com/libp2p/go-libp2p-core v0.8.0/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJB
github.com/libp2p/go-libp2p-core v0.8.1/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.2 h1:/eaSZACWftJZYm07S0nRxdI84v1hSmgnCXrGOvJdpNQ=
github.com/libp2p/go-libp2p-core v0.8.2/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-core v0.8.3 h1:BZTReEF6o8g/n4DwxTyeFannOeae35Xy0TD+mES3CNE=
github.com/libp2p/go-libp2p-core v0.8.3/go.mod h1:FfewUH/YpvWbEB+ZY9AQRQ4TAD8sJBt/G1rVvhz5XT8=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE=
Expand Down
22 changes: 17 additions & 5 deletions swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ func (s *Swarm) NewStream(ctx context.Context, p peer.ID) (network.Stream, error
// a non-closed connection.
dials := 0
for {
// will prefer direct connections over relayed connections for opening streams
c := s.bestConnToPeer(p)
if c == nil {
if nodial, _ := network.GetNoDial(ctx); nodial {
Expand Down Expand Up @@ -392,9 +393,10 @@ func (s *Swarm) ConnsToPeer(p peer.ID) []network.Conn {

// bestConnToPeer returns the best connection to peer.
func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
// Selects the best connection we have to the peer.
// TODO: Prefer some transports over others. Currently, we just select
// the newest non-closed connection with the most streams.

// TODO: Prefer some transports over others.
// For now, prefers direct connections over Relayed connections.
// For tie-breaking, select the newest non-closed connection with the most streams.
s.conns.RLock()
defer s.conns.RUnlock()

Expand All @@ -409,15 +411,25 @@ func (s *Swarm) bestConnToPeer(p peer.ID) *Conn {
cLen := len(c.streams.m)
c.streams.Unlock()

if cLen >= bestLen {
// We will never prefer a Relayed connection over a direct connection.
if isDirectConn(best) && !isDirectConn(c) {
continue
}

// 1. Always prefer a direct connection over a relayed connection.
// 2. If both conns are direct or relayed, pick the one with as many or more streams.
if (!isDirectConn(best) && isDirectConn(c)) || (cLen >= bestLen) {
best = c
bestLen = cLen
}

}
return best
}

func isDirectConn(c *Conn) bool {
return c != nil && !c.conn.Transport().Proxy()
}

// Connectedness returns our "connectedness" state with the given peer.
//
// To check if we have an open connection, use `s.Connectedness(p) ==
Expand Down
54 changes: 40 additions & 14 deletions swarm_dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,14 @@ func (s *Swarm) dialPeer(ctx context.Context, p peer.ID) (*Conn, error) {

defer log.EventBegin(ctx, "swarmDialAttemptSync", p).Done()

// check if we already have an open connection first
conn := s.bestConnToPeer(p)
if conn != nil {
forceDirect, _ := network.GetForceDirectDial(ctx)
if forceDirect {
if isDirectConn(conn) {
return conn, nil
}
} else if conn != nil {
// check if we already have an open connection first
return conn, nil
}

Expand Down Expand Up @@ -287,8 +292,13 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
// Short circuit.
// By the time we take the dial lock, we may already *have* a connection
// to the peer.
forceDirect, _ := network.GetForceDirectDial(ctx)
c := s.bestConnToPeer(p)
if c != nil {
if forceDirect {
if isDirectConn(c) {
return c, nil
}
} else if c != nil {
return c, nil
}

Expand All @@ -301,12 +311,17 @@ func (s *Swarm) doDial(ctx context.Context, p peer.ID) (*Conn, error) {
conn, err := s.dial(ctx, p)
if err != nil {
conn = s.bestConnToPeer(p)
if conn != nil {
if forceDirect {
if isDirectConn(conn) {
log.Debugf("ignoring dial error because we already have a direct connection: %s", err)
return conn, nil
}
} else if conn != nil {
// Hm? What error?
// Could have canceled the dial because we received a
// connection or some other random reason.
// Just ignore the error and return the connection.
log.Debugf("ignoring dial error because we have a connection: %s", err)
log.Debugf("ignoring dial error because we already have a connection: %s", err)
return conn, nil
}

Expand All @@ -321,6 +336,11 @@ func (s *Swarm) canDial(addr ma.Multiaddr) bool {
return t != nil && t.CanDial(addr)
}

func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
t := s.TransportForDialing(addr)
return !t.Proxy()
}

// ranks addresses in descending order of preference for dialing
// Private UDP > Public UDP > Private TCP > Public TCP > UDP Relay server > TCP Relay server
func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {
Expand Down Expand Up @@ -362,6 +382,7 @@ func (s *Swarm) rankAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {

// dial is the actual swarm's dial logic, gated by Dial.
func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
forceDirect, _ := network.GetForceDirectDial(ctx)
var logdial = lgbl.Dial("swarm", s.LocalPeer(), p, nil, nil)
if p == s.local {
log.Event(ctx, "swarmDialDoDialSelf", logdial)
Expand All @@ -383,20 +404,25 @@ func (s *Swarm) dial(ctx context.Context, p peer.ID) (*Conn, error) {
return nil, &DialError{Peer: p, Cause: ErrNoAddresses}
}
goodAddrs := s.filterKnownUndialables(p, peerAddrs)
if forceDirect {
goodAddrs = addrutil.FilterAddrs(goodAddrs, s.nonProxyAddr)
}
if len(goodAddrs) == 0 {
return nil, &DialError{Peer: p, Cause: ErrNoGoodAddresses}
}

/////// Check backoff andnRank addresses
var nonBackoff bool
for _, a := range goodAddrs {
// skip addresses in back-off
if !s.backf.Backoff(p, a) {
nonBackoff = true
if !forceDirect {
/////// Check backoff andnRank addresses
var nonBackoff bool
for _, a := range goodAddrs {
// skip addresses in back-off
if !s.backf.Backoff(p, a) {
nonBackoff = true
}
}
if !nonBackoff {
return nil, ErrDialBackoff
}
}
if !nonBackoff {
return nil, ErrDialBackoff
}

connC, dialErr := s.dialAddrs(ctx, p, s.rankAddrs(goodAddrs))
Expand Down

0 comments on commit f82c425

Please sign in to comment.