Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't hole punch if either peer is behind a Symmetric NAT #1046

Closed
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
78ce16b
emit NAT device type
aarshkshah1992 Jan 27, 2021
0b45239
unit tests
aarshkshah1992 Jan 27, 2021
3566bfd
emit event for NAT Device Type
aarshkshah1992 Jan 28, 2021
29d4987
updated deps
aarshkshah1992 Jan 28, 2021
5484ccb
refactor tcp-udp NAT events
aarshkshah1992 Feb 1, 2021
1537769
update go ver
aarshkshah1992 Feb 1, 2021
c2295d5
go mod tidy
aarshkshah1992 Feb 1, 2021
2296117
update deps
aarshkshah1992 Feb 1, 2021
d337499
fix multiple events
aarshkshah1992 Feb 1, 2021
1956fea
Implement support for simultaneous open
vyzo Aug 26, 2019
cfef141
updated deps
aarshkshah1992 Jan 12, 2021
10adf33
simultaneous connect
aarshkshah1992 Jan 15, 2021
ef26854
added tests
aarshkshah1992 Jan 15, 2021
ae61f65
added tests
aarshkshah1992 Jan 15, 2021
0b0b410
more testing
aarshkshah1992 Jan 15, 2021
90cd9bb
host changes
aarshkshah1992 Jan 16, 2021
2603920
test scripts
aarshkshah1992 Jan 18, 2021
e868d73
default listen
aarshkshah1992 Jan 18, 2021
431a9a6
addrs
aarshkshah1992 Jan 18, 2021
fa9994c
relay server
aarshkshah1992 Jan 18, 2021
001aa52
rela server
aarshkshah1992 Jan 18, 2021
14ef1ec
relay server
aarshkshah1992 Jan 18, 2021
450d431
relay server
aarshkshah1992 Jan 18, 2021
36ca4a7
addrs
aarshkshah1992 Jan 18, 2021
1bdb86c
testing
aarshkshah1992 Jan 18, 2021
1d6d07f
server code
aarshkshah1992 Jan 18, 2021
5ecac86
server code
aarshkshah1992 Jan 18, 2021
d9d7e6a
more changes
aarshkshah1992 Jan 19, 2021
dcf822c
upated swarm
aarshkshah1992 Jan 19, 2021
2ee5344
clean-up
aarshkshah1992 Jan 21, 2021
46ef4d8
cleaned up PR
aarshkshah1992 Jan 21, 2021
749086f
rebased to have NAT type event
aarshkshah1992 Feb 2, 2021
37141ec
better logging
aarshkshah1992 Feb 2, 2021
5bf6a8f
remove log
aarshkshah1992 Feb 2, 2021
837edb0
log connection as well
aarshkshah1992 Feb 2, 2021
35b7b8a
remove chatty logs
aarshkshah1992 Feb 2, 2021
9482582
fix go-multiaddr
aarshkshah1992 Feb 3, 2021
eddb65c
fix logging
aarshkshah1992 Feb 3, 2021
6ec9163
event driven hole punching
aarshkshah1992 Feb 3, 2021
d0d527c
always dial public addrs
aarshkshah1992 Feb 3, 2021
651ae86
address review
aarshkshah1992 Feb 18, 2021
9b483aa
rebased PR
aarshkshah1992 Feb 19, 2021
3e9dc38
fix typo
aarshkshah1992 Feb 19, 2021
ea96e13
fixed errs
aarshkshah1992 Feb 19, 2021
792ff75
fixed errs
aarshkshah1992 Feb 19, 2021
6efab25
save NAT type to peerstore
aarshkshah1992 Feb 19, 2021
00bd3a7
added a test
aarshkshah1992 Feb 19, 2021
48350b4
removed double init
aarshkshah1992 Feb 19, 2021
9267948
finished merging
aarshkshah1992 Feb 23, 2021
b553c16
unit tests
aarshkshah1992 Feb 24, 2021
f981b25
fixed races
aarshkshah1992 Feb 24, 2021
a3fee65
update deps due to license
ZBoIsHere Aug 2, 2021
9b67b75
Merge pull request #1146 from ZBoIsHere/feat/evt-based-hole-punching-…
Stebalien Aug 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ func NewHost(ctx context.Context, n network.Network, opts *HostOpts) (*BasicHost
h.mux = opts.MultistreamMuxer
}

h.Peerstore().Put(h.ID(), identify.UDPNATDeviceTypeKey, network.NATDeviceTypeUnknown)
h.Peerstore().Put(h.ID(), identify.TCPNATDeviceTypeKey, network.NATDeviceTypeUnknown)
// we can't set this as a default above because it depends on the *BasicHost.
if h.disableSignedPeerRecord {
h.ids, err = identify.NewIDService(h, identify.UserAgent(opts.UserAgent), identify.DisableSignedPeerRecord())
Expand All @@ -229,6 +231,13 @@ func NewHost(ctx context.Context, n network.Network, opts *HostOpts) (*BasicHost
return nil, fmt.Errorf("failed to create Identify service: %s", err)
}

if opts.EnableHolePunching {
h.hps, err = holepunch.NewHolePunchService(h, h.ids)
if err != nil {
return nil, fmt.Errorf("failed to create hole punch service: %w", err)
}
}

if uint64(opts.NegotiationTimeout) != 0 {
h.negtimeout = opts.NegotiationTimeout
}
Expand Down
75 changes: 75 additions & 0 deletions p2p/protocol/holepunch/coordination.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

logging "github.com/ipfs/go-log"
"github.com/jpillora/backoff"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -49,11 +50,55 @@ func NewHolePunchService(h host.Host, ids *identify.IDService) (*HolePunchServic
ctx, cancel := context.WithCancel(context.Background())
hs := &HolePunchService{ctx: ctx, ctxCancel: cancel, host: h, ids: ids}

sub, err := h.EventBus().Subscribe(new(event.EvtNATDeviceTypeChanged))
if err != nil {
return nil, err
}

h.SetStreamHandler(protocol, hs.handleNewStream)
h.Network().Notify((*netNotifiee)(hs))

hs.refCount.Add(1)
go hs.loop(sub)

return hs, nil
}

func (hs *HolePunchService) loop(sub event.Subscription) {
defer hs.refCount.Done()
defer sub.Close()

for {
select {
// Our local NAT device types are intialized in the peerstore when the Host is created
// and updated in the peerstore by the Observed Address Manager.
case _, ok := <-sub.Out():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we look at the actual event? why are we ignoring it?

Copy link
Contributor

@vyzo vyzo Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, where do we set the support meta-variables into the peerstore?
Shouldn't we be doing it here? Or is it handled elsewhere?

I think we need a comment if this is correct as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vyzo

  1. Our local vars are set in the peerstore in basic_host.go during initialisation of the Host & updated in the obs addr manager.

  2. Remote vars are set in the peerstore by Identify.

Both changes are included in this PR. I've added a comment.

if !ok {
return
}

if hs.peerSupportsHolePunching(hs.host.ID(), hs.host.Addrs()) {
hs.host.SetStreamHandler(protocol, hs.handleNewStream)
} else {
hs.host.RemoveStreamHandler(protocol)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, this is a BUG -- we lose the ability to dial back connect to a public node that has dialed us.

}

case <-hs.ctx.Done():
return
}
}
}

func hasProtoAddr(protocCode int, addrs []ma.Multiaddr) bool {
for _, a := range addrs {
if _, err := a.ValueForProtocol(protocCode); err == nil {
return true
}
}

return false
}

// Close closes the Hole Punch Service.
func (hs *HolePunchService) Close() error {
hs.closeSync.Do(func() {
Expand Down Expand Up @@ -84,6 +129,12 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) {
}
}

// return if either peer does NOT support hole punching
if !hs.peerSupportsHolePunching(rp, hs.host.Peerstore().Addrs(rp)) ||
!hs.peerSupportsHolePunching(hs.host.ID(), hs.host.Addrs()) {
return
}

// hole punch
hpCtx := network.WithUseTransient(hs.ctx, "hole-punch")
sCtx := network.WithNoDial(hpCtx, "hole-punch")
Expand Down Expand Up @@ -150,6 +201,30 @@ func (hs *HolePunchService) holePunch(relayConn network.Conn) {
}
}

// We can hole punch with a peer ONLY if it is NOT behind a symmetric NAT for all the transport protocol it supports.
func (hs *HolePunchService) peerSupportsHolePunching(p peer.ID, addrs []ma.Multiaddr) bool {
udpSupported := hasProtoAddr(ma.P_UDP, addrs)
tcpSupported := hasProtoAddr(ma.P_TCP, addrs)
udpNAT, _ := hs.host.Peerstore().Get(p, identify.UDPNATDeviceTypeKey)
tcpNAT, _ := hs.host.Peerstore().Get(p, identify.TCPNATDeviceTypeKey)
udpNatType := udpNAT.(network.NATDeviceType)
tcpNATType := tcpNAT.(network.NATDeviceType)

if udpSupported {
if udpNatType == network.NATDeviceTypeCone || udpNatType == network.NATDeviceTypeUnknown {
return true
}
}

if tcpSupported {
if tcpNATType == network.NATDeviceTypeCone || tcpNATType == network.NATDeviceTypeUnknown {
return true
}
}

return false
}

func (hs *HolePunchService) handleNewStream(s network.Stream) {
log.Infof("got hole punch request from peer %s", s.Conn().RemotePeer().Pretty())
_ = s.SetDeadline(time.Now().Add(holePunchTimeout))
Expand Down
38 changes: 38 additions & 0 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ const ID = "/ipfs/id/1.0.0"
// 0.4.17 which asserted an exact version match.
const LibP2PVersion = "ipfs/0.1.0"

const (
// UDPNATDeviceTypeKey is the key with which we will persist a peer's UDP NAT Device Type to the peerstore.
UDPNATDeviceTypeKey = "UDPNATDeviceType"
// TCPNATDeviceTypeKey is the key with which we will persist a peer's TCP NAT Device Type to the peerstore.
TCPNATDeviceTypeKey = "TCPNATDeviceType"
)

// ClientVersion is the default user agent.
//
// Deprecated: Set this with the UserAgent option.
Expand Down Expand Up @@ -542,10 +549,37 @@ func (ids *IDService) createBaseIdentifyResponse(
av := ids.UserAgent
mes.ProtocolVersion = &pv
mes.AgentVersion = &av
udpNAT, tcpNAT := ids.observedAddrs.getNATDeviceTypes()
udpNATPb := toPbNATDeviceType(udpNAT)
tcpNATPb := toPbNATDeviceType(tcpNAT)
mes.UdpNATDeviceType = &udpNATPb
mes.TcpNATDeviceType = &tcpNATPb

return mes
}

func toPbNATDeviceType(t network.NATDeviceType) pb.Identify_NATDeviceType {
switch t {
case network.NATDeviceTypeCone:
return pb.Identify_CONE
case network.NATDeviceTypeSymmetric:
return pb.Identify_SYMMETRIC
default:
return pb.Identify_UNKNOWN
}
}

func fromPbNATDeviceType(typ pb.Identify_NATDeviceType) network.NATDeviceType {
switch typ {
case pb.Identify_CONE:
return network.NATDeviceTypeCone
case pb.Identify_SYMMETRIC:
return network.NATDeviceTypeSymmetric
default:
return network.NATDeviceTypeUnknown
}
}

func (ids *IDService) getSignedRecord(snapshot *identifySnapshot) []byte {
if ids.disableSignedPeerRecord || snapshot.record == nil {
return nil
Expand Down Expand Up @@ -634,9 +668,13 @@ func (ids *IDService) consumeMessage(mes *pb.Identify, c network.Conn) {
// get protocol versions
pv := mes.GetProtocolVersion()
av := mes.GetAgentVersion()
udpNATType := mes.GetUdpNATDeviceType()
tcpNATType := mes.GetTcpNATDeviceType()

ids.Host.Peerstore().Put(p, "ProtocolVersion", pv)
ids.Host.Peerstore().Put(p, "AgentVersion", av)
ids.Host.Peerstore().Put(p, UDPNATDeviceTypeKey, fromPbNATDeviceType(udpNATType))
ids.Host.Peerstore().Put(p, TCPNATDeviceTypeKey, fromPbNATDeviceType(tcpNATType))

// get the key from the other side. we may not have it (no-auth transport)
ids.consumeReceivedPubKey(c, mes.PublicKey)
Expand Down
15 changes: 15 additions & 0 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,21 @@ func subtestIDService(t *testing.T) {
testHasCertifiedAddrs(t, h1, h2p, []ma.Multiaddr{})
testHasCertifiedAddrs(t, h2, h1p, []ma.Multiaddr{})

// test we have the TCP & UDP NAT Device Types on both sides
udp, err := h1.Peerstore().Get(h2p, identify.UDPNATDeviceTypeKey)
require.NoError(t, err)
require.Equal(t, network.NATDeviceTypeUnknown, udp.(network.NATDeviceType))
tcp, err := h1.Peerstore().Get(h2p, identify.TCPNATDeviceTypeKey)
require.NoError(t, err)
require.Equal(t, network.NATDeviceTypeUnknown, tcp.(network.NATDeviceType))

udp, err = h2.Peerstore().Get(h1p, identify.UDPNATDeviceTypeKey)
require.NoError(t, err)
require.Equal(t, network.NATDeviceTypeUnknown, udp.(network.NATDeviceType))
tcp, err = h2.Peerstore().Get(h1p, identify.TCPNATDeviceTypeKey)
require.NoError(t, err)
require.Equal(t, network.NATDeviceTypeUnknown, tcp.(network.NATDeviceType))

// test that we received the "identify completed" event.
select {
case <-sub.Out():
Expand Down
23 changes: 22 additions & 1 deletion p2p/protocol/identify/obsaddr.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ func NewObservedAddrManager(ctx context.Context, host host.Host) (*ObservedAddrM
return oas, nil
}

func (oas *ObservedAddrManager) getNATDeviceTypes() (udp, tcp network.NATDeviceType) {
oas.mu.RLock()
defer oas.mu.RUnlock()

return oas.currentUDPNATDeviceType, oas.currentTCPNATDeviceType
}

// AddrsFor return all activated observed addresses associated with the given
// (resolved) listen address.
func (oas *ObservedAddrManager) AddrsFor(addr ma.Multiaddr) (addrs []ma.Multiaddr) {
Expand Down Expand Up @@ -200,6 +207,21 @@ func (oas *ObservedAddrManager) filter(observedAddrs []*observedAddr) []ma.Multi
}
}

// For certain use cases such as hole punching, it's better to advertise even unactivated observed addresses rather than none at all
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vyzo This needs your attention. I discussed this with @Stebalien and he was okay with it.

// because we don't want to wait for a hole-punch till we make enough connections with other peers to discover our activated addresses.
// If we have activated addresses, we will use them, otherwise, let's use whatever observed addresses we do have.
if len(pmap) == 0 {
for i := range observedAddrs {
a := observedAddrs[i]
if now.Sub(a.lastSeen) <= oas.ttl {
// group addresses by their IPX/Transport Protocol(TCP or UDP) pattern.
pat := a.groupKey()
pmap[pat] = append(pmap[pat], a)

}
}
}

addrs := make([]ma.Multiaddr, 0, len(observedAddrs))
for pat := range pmap {
s := pmap[pat]
Expand Down Expand Up @@ -269,7 +291,6 @@ func (oas *ObservedAddrManager) worker(ctx context.Context) {

case obs := <-oas.wch:
oas.maybeRecordObservation(obs.conn, obs.observed)

case <-ticker.C:
oas.gc()
case <-oas.refreshTimer.C:
Expand Down
22 changes: 11 additions & 11 deletions p2p/protocol/identify/obsaddr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package identify_test

import (
"context"
"fmt"
"sync"
"testing"
"time"

detectrace "github.com/ipfs/go-detect-race"
"github.com/libp2p/go-eventbus"
"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
Expand All @@ -16,6 +16,7 @@ import (
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
identify "github.com/libp2p/go-libp2p/p2p/protocol/identify"

detectrace "github.com/ipfs/go-detect-race"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -171,25 +172,22 @@ func TestObsAddrSet(t *testing.T) {
harness.observe(a2, pa4)
harness.observe(a3, pa4)

// these are all different so we should not yet get them.
if !addrsMatch(harness.oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (once)")
}

// same observer, so should not yet get them.
// get the address as none has been activated yet.
harness.observe(a1, pa4)
harness.observe(a2, pa4)
harness.observe(a3, pa4)
if !addrsMatch(harness.oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (same obs)")

fmt.Println(harness.oas.Addrs())
if !addrsMatch(harness.oas.Addrs(), []ma.Multiaddr{a1, a2}) {
t.Error("should get expected addresses as there are no activated addresses")
}

// different observer, but same observer group.
harness.observe(a1, pa5)
harness.observe(a2, pa5)
harness.observe(a3, pa5)
if !addrsMatch(harness.oas.Addrs(), nil) {
t.Error("addrs should _still_ be empty (same obs group)")
if !addrsMatch(harness.oas.Addrs(), []ma.Multiaddr{a1, a2}) {
t.Error("should get expected addresses as there are no activated addresses")
}

harness.observe(a1, pb1)
Expand Down Expand Up @@ -383,6 +381,7 @@ func TestEmitNATDeviceTypeSymmetric(t *testing.T) {
defer cancel()
harness := newHarness(ctx, t)
require.Empty(t, harness.oas.Addrs())

emitter, err := harness.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)
require.NoError(t, err)
require.NoError(t, emitter.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate}))
Expand Down Expand Up @@ -429,6 +428,7 @@ func TestEmitNATDeviceTypeCone(t *testing.T) {
defer cancel()
harness := newHarness(ctx, t)
require.Empty(t, harness.oas.Addrs())

emitter, err := harness.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged), eventbus.Stateful)
require.NoError(t, err)
require.NoError(t, emitter.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate}))
Expand Down