Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autonatv2: implement autonatv2 spec #2469

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 83 additions & 0 deletions config/config.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/autonat"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
Expand Down Expand Up @@ -131,6 +132,8 @@ type Config struct {
SwarmOpts []swarm.Option

DisableIdentifyAddressDiscovery bool

DisableAutoNATv2 bool
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -193,6 +196,76 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa
return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...)
}

func (cfg *Config) makeAutoNATHost() (host.Host, error) {
autonatPrivKey, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}

autoNatCfg := Config{
Transports: cfg.Transports,
Muxers: cfg.Muxers,
SecurityTransports: cfg.SecurityTransports,
Insecure: cfg.Insecure,
PSK: cfg.PSK,
ConnectionGater: cfg.ConnectionGater,
Reporter: cfg.Reporter,
PeerKey: autonatPrivKey,
Peerstore: ps,
DialRanker: swarm.NoDelayDialRanker,
SwarmOpts: []swarm.Option{
// Disable black hole detection on autonat dialers
// It is better to attempt a dial and fail for AutoNAT use cases
swarm.WithUDPBlackHoleConfig(false, 0, 0),
swarm.WithIPv6BlackHoleConfig(false, 0, 0),
},
}
fxopts, err := autoNatCfg.addTransports()
if err != nil {
return nil, err
}
var dialerHost host.Host
fxopts = append(fxopts,
fx.Provide(eventbus.NewBus),
fx.Provide(func(lifecycle fx.Lifecycle, b event.Bus) (*swarm.Swarm, error) {
lifecycle.Append(fx.Hook{
OnStop: func(context.Context) error {
return ps.Close()
}})
sw, err := autoNatCfg.makeSwarm(b, false)
return sw, err
}),
fx.Provide(func(sw *swarm.Swarm) *blankhost.BlankHost {
return blankhost.NewBlankHost(sw)
}),
fx.Provide(func(bh *blankhost.BlankHost) host.Host {
return bh
}),
fx.Provide(func() crypto.PrivKey { return autonatPrivKey }),
fx.Provide(func(bh host.Host) peer.ID { return bh.ID() }),
fx.Invoke(func(bh *blankhost.BlankHost) {
dialerHost = bh
}),
)
app := fx.New(fxopts...)
if err := app.Err(); err != nil {
return nil, err
}
err = app.Start(context.Background())
if err != nil {
return nil, err
}
go func() {
<-dialerHost.Network().(*swarm.Swarm).Done()
app.Stop(context.Background())
}()
return dialerHost, nil
}

func (cfg *Config) addTransports() ([]fx.Option, error) {
fxopts := []fx.Option{
fx.WithLogger(func() fxevent.Logger { return getFXLogger() }),
Expand Down Expand Up @@ -291,6 +364,14 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
}

func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.BasicHost, error) {
var autonatv2Dialer host.Host
if !cfg.DisableAutoNATv2 {
ah, err := cfg.makeAutoNATHost()
if err != nil {
return nil, err
}
autonatv2Dialer = ah
}
h, err := bhost.NewHost(swrm, &bhost.HostOpts{
EventBus: eventBus,
ConnManager: cfg.ConnManager,
Expand All @@ -306,6 +387,8 @@ func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.B
EnableMetrics: !cfg.DisableMetrics,
PrometheusRegisterer: cfg.PrometheusRegisterer,
DisableIdentifyAddressDiscovery: cfg.DisableIdentifyAddressDiscovery,
EnableAutoNATv2: !cfg.DisableAutoNATv2,
AutoNATv2Dialer: autonatv2Dialer,
})
if err != nil {
return nil, err
Expand Down
17 changes: 17 additions & 0 deletions core/network/network.go
Expand Up @@ -161,6 +161,20 @@ type Network interface {
ResourceManager() ResourceManager
}

// Dialability indicates how dialable a (peer, addr) combination is.
type Dialability int

const (
// DialabilityUnknown indicates that the dialer cannot dial the (peer, addr) combination
DialabilityUnknown Dialability = iota

// DialabilityDialable indicates that the dialer can dial the (peer, addr) combination
DialabilityDialable

// DialabilityUndialable indicates that the dialer cannot dial the (peer, addr) combinaton
DialabilityUndialable
)

// Dialer represents a service that can dial out to peers
// (this is usually just a Network, but other services may not need the whole
// stack, and thus it becomes easier to mock)
Expand Down Expand Up @@ -194,6 +208,9 @@ type Dialer interface {
// Notify/StopNotify register and unregister a notifiee for signals
Notify(Notifiee)
StopNotify(Notifiee)

// CanDial returns whether the dialer can dial peer p at addr
CanDial(p peer.ID, addr ma.Multiaddr) Dialability
}

// AddrDelay provides an address along with the delay after which the address
Expand Down
8 changes: 8 additions & 0 deletions options.go
Expand Up @@ -609,3 +609,11 @@ func DisableIdentifyAddressDiscovery() Option {
return nil
}
}

// DisableAutoNATv2 disables autonat
func DisableAutoNATv2() Option {
return func(cfg *Config) error {
cfg.DisableAutoNATv2 = true
return nil
}
}
15 changes: 15 additions & 0 deletions p2p/host/basic/basic_host.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/pstoremanager"
"github.com/libp2p/go-libp2p/p2p/host/relaysvc"
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
Expand Down Expand Up @@ -105,6 +106,8 @@ type BasicHost struct {
caBook peerstore.CertifiedAddrBook

autoNat autonat.AutoNAT

autonatv2 *autonatv2.AutoNAT
}

var _ host.Host = (*BasicHost)(nil)
Expand Down Expand Up @@ -167,6 +170,8 @@ type HostOpts struct {

// DisableIdentifyAddressDiscovery disables address discovery using peer provided observed addresses in identify
DisableIdentifyAddressDiscovery bool
EnableAutoNATv2 bool
AutoNATv2Dialer host.Host
}

// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
Expand Down Expand Up @@ -310,6 +315,13 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
h.pings = ping.NewPingService(h)
}

if opts.EnableAutoNATv2 {
h.autonatv2, err = autonatv2.New(h, opts.AutoNATv2Dialer)
if err != nil {
return nil, fmt.Errorf("failed to create autonatv2: %w", err)
}
}

n.SetStreamHandler(h.newStreamHandler)

// register to be notified when the network's listen addrs change,
Expand Down Expand Up @@ -1100,6 +1112,9 @@ func (h *BasicHost) Close() error {
if h.hps != nil {
h.hps.Close()
}
if h.autonatv2 != nil {
h.autonatv2.Close()
}

_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/mock/mock_peernet.go
Expand Up @@ -434,3 +434,7 @@ func (pn *peernet) notifyAll(notification func(f network.Notifiee)) {
func (pn *peernet) ResourceManager() network.ResourceManager {
return &network.NullResourceManager{}
}

func (pn *peernet) CanDial(p peer.ID, addr ma.Multiaddr) network.Dialability {
return network.DialabilityUnknown
}
28 changes: 28 additions & 0 deletions p2p/net/swarm/black_hole_detector.go
Expand Up @@ -144,6 +144,13 @@ func (b *blackHoleFilter) updateState() {
}
}

func (b *blackHoleFilter) State() blackHoleState {
b.mu.Lock()
defer b.mu.Unlock()

return b.state
}

func (b *blackHoleFilter) trackMetrics() {
if b.metricsTracer == nil {
return
Expand Down Expand Up @@ -244,6 +251,27 @@ func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) {
}
}

func (d *blackHoleDetector) State(addr ma.Multiaddr) blackHoleState {
if !manet.IsPublicAddr(addr) {
return blackHoleStateAllowed
}

if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) {
udpState := d.udp.State()
if udpState != blackHoleStateAllowed {
return udpState
}
}

if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) {
ipv6State := d.ipv6.State()
if ipv6State != blackHoleStateAllowed {
return ipv6State
}
}
return blackHoleStateAllowed
}

// blackHoleConfig is the config used for black hole detection
type blackHoleConfig struct {
// Enabled enables black hole detection
Expand Down
12 changes: 12 additions & 0 deletions p2p/net/swarm/black_hole_detector_test.go
Expand Up @@ -17,6 +17,9 @@ func TestBlackHoleFilterReset(t *testing.T) {
if bhf.HandleRequest() != blackHoleResultProbing {
t.Fatalf("expected calls up to n to be probes")
}
if bhf.State() != blackHoleStateProbing {
t.Fatalf("expected state to be probing got %s", bhf.State())
}
bhf.RecordResult(false)
}

Expand All @@ -26,6 +29,9 @@ func TestBlackHoleFilterReset(t *testing.T) {
if (i%n == 0 && result != blackHoleResultProbing) || (i%n != 0 && result != blackHoleResultBlocked) {
t.Fatalf("expected every nth dial to be a probe")
}
if bhf.State() != blackHoleStateBlocked {
t.Fatalf("expected state to be blocked, got %s", bhf.State())
}
}

bhf.RecordResult(true)
Expand All @@ -34,12 +40,18 @@ func TestBlackHoleFilterReset(t *testing.T) {
if bhf.HandleRequest() != blackHoleResultProbing {
t.Fatalf("expected black hole detector state to reset after success")
}
if bhf.State() != blackHoleStateProbing {
t.Fatalf("expected state to be probing got %s", bhf.State())
}
bhf.RecordResult(false)
}

// next call should be blocked
if bhf.HandleRequest() != blackHoleResultBlocked {
t.Fatalf("expected dial to be blocked")
if bhf.State() != blackHoleStateBlocked {
t.Fatalf("expected state to be blocked, got %s", bhf.State())
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/swarm.go
Expand Up @@ -112,7 +112,7 @@ func WithDialRanker(d network.DialRanker) Option {
}
}

// WithUDPBlackHoleConfig configures swarm to use c as the config for UDP black hole detection
// WithUDPBlackHoleConfig configures swarm to use the provided config for UDP black hole detection
// n is the size of the sliding window used to evaluate black hole state
// min is the minimum number of successes out of n required to not block requests
func WithUDPBlackHoleConfig(enabled bool, n, min int) Option {
Expand All @@ -122,7 +122,7 @@ func WithUDPBlackHoleConfig(enabled bool, n, min int) Option {
}
}

// WithIPv6BlackHoleConfig configures swarm to use c as the config for IPv6 black hole detection
// WithIPv6BlackHoleConfig configures swarm to use the provided config for IPv6 black hole detection
// n is the size of the sliding window used to evaluate black hole state
// min is the minimum number of successes out of n required to not block requests
func WithIPv6BlackHoleConfig(enabled bool, n, min int) Option {
Expand Down
20 changes: 20 additions & 0 deletions p2p/net/swarm/swarm_dial.go
Expand Up @@ -416,6 +416,26 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr,
return nil
}

func (s *Swarm) CanDial(p peer.ID, addr ma.Multiaddr) network.Dialability {
dialable, _ := s.filterKnownUndialables(p, []ma.Multiaddr{addr})
if len(dialable) == 0 {
return network.DialabilityUndialable
}

bhState := s.bhd.State(addr)
switch bhState {
case blackHoleStateAllowed:
return network.DialabilityDialable
case blackHoleStateProbing:
return network.DialabilityUnknown
case blackHoleStateBlocked:
return network.DialabilityUndialable
default:
log.Errorf("SWARM BUG: unhandled black hole state for dilability check %s", bhState)
return network.DialabilityUnknown
}
}

func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
t := s.TransportForDialing(addr)
return !t.Proxy()
Expand Down