Skip to content

Commit

Permalink
autonatv2: implement specs
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Apr 23, 2024
1 parent 0385ec9 commit e080d99
Show file tree
Hide file tree
Showing 18 changed files with 2,726 additions and 5 deletions.
83 changes: 83 additions & 0 deletions config/config.go
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/autonat"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
Expand Down Expand Up @@ -129,6 +130,8 @@ type Config struct {
DialRanker network.DialRanker

SwarmOpts []swarm.Option

DisableAutoNATv2 bool
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -191,6 +194,76 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa
return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...)
}

func (cfg *Config) makeAutoNATHost() (host.Host, error) {
autonatPrivKey, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}

autoNatCfg := Config{
Transports: cfg.Transports,
Muxers: cfg.Muxers,
SecurityTransports: cfg.SecurityTransports,
Insecure: cfg.Insecure,
PSK: cfg.PSK,
ConnectionGater: cfg.ConnectionGater,
Reporter: cfg.Reporter,
PeerKey: autonatPrivKey,
Peerstore: ps,
DialRanker: swarm.NoDelayDialRanker,
SwarmOpts: []swarm.Option{
// Disable black hole detection on autonat dialers
// It is better to attempt a dial and fail for AutoNAT use cases
swarm.WithUDPBlackHoleConfig(false, 0, 0),
swarm.WithIPv6BlackHoleConfig(false, 0, 0),
},
}
fxopts, err := autoNatCfg.addTransports()
if err != nil {
return nil, err
}
var dialerHost host.Host
fxopts = append(fxopts,
fx.Provide(eventbus.NewBus),
fx.Provide(func(lifecycle fx.Lifecycle, b event.Bus) (*swarm.Swarm, error) {
lifecycle.Append(fx.Hook{
OnStop: func(context.Context) error {
return ps.Close()
}})
sw, err := autoNatCfg.makeSwarm(b, false)
return sw, err
}),
fx.Provide(func(sw *swarm.Swarm) *blankhost.BlankHost {
return blankhost.NewBlankHost(sw)
}),
fx.Provide(func(bh *blankhost.BlankHost) host.Host {
return bh
}),
fx.Provide(func() crypto.PrivKey { return autonatPrivKey }),
fx.Provide(func(bh host.Host) peer.ID { return bh.ID() }),
fx.Invoke(func(bh *blankhost.BlankHost) {
dialerHost = bh
}),
)
app := fx.New(fxopts...)
if err := app.Err(); err != nil {
return nil, err
}
err = app.Start(context.Background())
if err != nil {
return nil, err
}
go func() {
<-dialerHost.Network().(*swarm.Swarm).Done()
app.Stop(context.Background())
}()
return dialerHost, nil
}

func (cfg *Config) addTransports() ([]fx.Option, error) {
fxopts := []fx.Option{
fx.WithLogger(func() fxevent.Logger { return getFXLogger() }),
Expand Down Expand Up @@ -289,6 +362,14 @@ func (cfg *Config) addTransports() ([]fx.Option, error) {
}

func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.BasicHost, error) {
var autonatv2Dialer host.Host
if !cfg.DisableAutoNATv2 {
ah, err := cfg.makeAutoNATHost()
if err != nil {
return nil, err
}
autonatv2Dialer = ah
}
h, err := bhost.NewHost(swrm, &bhost.HostOpts{
EventBus: eventBus,
ConnManager: cfg.ConnManager,
Expand All @@ -303,6 +384,8 @@ func (cfg *Config) newBasicHost(swrm *swarm.Swarm, eventBus event.Bus) (*bhost.B
RelayServiceOpts: cfg.RelayServiceOpts,
EnableMetrics: !cfg.DisableMetrics,
PrometheusRegisterer: cfg.PrometheusRegisterer,
EnableAutoNATv2: !cfg.DisableAutoNATv2,
AutoNATv2Dialer: autonatv2Dialer,
})
if err != nil {
return nil, err
Expand Down
17 changes: 17 additions & 0 deletions core/network/network.go
Expand Up @@ -152,6 +152,20 @@ type Network interface {
ResourceManager() ResourceManager
}

// Dialability indicates how dialable a (peer, addr) combination is.
type Dialability int

const (
// DialabilityUnknown indicates that the dialer cannot dial the (peer, addr) combination
DialabilityUnknown Dialability = iota

// DialabilityDialable indicates that the dialer can dial the (peer, addr) combination
DialabilityDialable

// DialabilityUndialable indicates that the dialer cannot dial the (peer, addr) combinaton
DialabilityUndialable
)

// Dialer represents a service that can dial out to peers
// (this is usually just a Network, but other services may not need the whole
// stack, and thus it becomes easier to mock)
Expand Down Expand Up @@ -185,6 +199,9 @@ type Dialer interface {
// Notify/StopNotify register and unregister a notifiee for signals
Notify(Notifiee)
StopNotify(Notifiee)

// CanDial returns whether the dialer can dial peer p at addr
CanDial(p peer.ID, addr ma.Multiaddr) Dialability
}

// AddrDelay provides an address along with the delay after which the address
Expand Down
8 changes: 8 additions & 0 deletions options.go
Expand Up @@ -598,3 +598,11 @@ func SwarmOpts(opts ...swarm.Option) Option {
return nil
}
}

// DisableAutoNATv2 disables autonat
func DisableAutoNATv2() Option {
return func(cfg *Config) error {
cfg.DisableAutoNATv2 = true
return nil
}
}
16 changes: 16 additions & 0 deletions p2p/host/basic/basic_host.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/pstoremanager"
"github.com/libp2p/go-libp2p/p2p/host/relaysvc"
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
Expand Down Expand Up @@ -102,6 +103,8 @@ type BasicHost struct {
caBook peerstore.CertifiedAddrBook

autoNat autonat.AutoNAT

autonatv2 *autonatv2.AutoNAT
}

var _ host.Host = (*BasicHost)(nil)
Expand Down Expand Up @@ -161,6 +164,9 @@ type HostOpts struct {
EnableMetrics bool
// PrometheusRegisterer is the PrometheusRegisterer used for metrics
PrometheusRegisterer prometheus.Registerer

EnableAutoNATv2 bool
AutoNATv2Dialer host.Host
}

// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
Expand Down Expand Up @@ -301,6 +307,13 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
h.pings = ping.NewPingService(h)
}

if opts.EnableAutoNATv2 {
h.autonatv2, err = autonatv2.New(h, opts.AutoNATv2Dialer)
if err != nil {
return nil, fmt.Errorf("failed to create autonatv2: %w", err)
}
}

n.SetStreamHandler(h.newStreamHandler)

// register to be notified when the network's listen addrs change,
Expand Down Expand Up @@ -1029,6 +1042,9 @@ func (h *BasicHost) Close() error {
if h.hps != nil {
h.hps.Close()
}
if h.autonatv2 != nil {
h.autonatv2.Close()
}

_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()
Expand Down
7 changes: 4 additions & 3 deletions p2p/host/blank/blank.go
Expand Up @@ -63,9 +63,10 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
}

bh := &BlankHost{
n: n,
cmgr: cfg.cmgr,
mux: mstream.NewMultistreamMuxer[protocol.ID](),
n: n,
cmgr: cfg.cmgr,
mux: mstream.NewMultistreamMuxer[protocol.ID](),
eventbus: cfg.eventBus,
}
if bh.eventbus == nil {
bh.eventbus = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer()))
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/mock/mock_peernet.go
Expand Up @@ -434,3 +434,7 @@ func (pn *peernet) notifyAll(notification func(f network.Notifiee)) {
func (pn *peernet) ResourceManager() network.ResourceManager {
return &network.NullResourceManager{}
}

func (pn *peernet) CanDial(p peer.ID, addr ma.Multiaddr) network.Dialability {
return network.DialabilityUnknown
}
28 changes: 28 additions & 0 deletions p2p/net/swarm/black_hole_detector.go
Expand Up @@ -144,6 +144,13 @@ func (b *blackHoleFilter) updateState() {
}
}

func (b *blackHoleFilter) State() blackHoleState {
b.mu.Lock()
defer b.mu.Unlock()

return b.state
}

func (b *blackHoleFilter) trackMetrics() {
if b.metricsTracer == nil {
return
Expand Down Expand Up @@ -244,6 +251,27 @@ func (d *blackHoleDetector) RecordResult(addr ma.Multiaddr, success bool) {
}
}

func (d *blackHoleDetector) State(addr ma.Multiaddr) blackHoleState {
if !manet.IsPublicAddr(addr) {
return blackHoleStateAllowed
}

if d.udp != nil && isProtocolAddr(addr, ma.P_UDP) {
udpState := d.udp.State()
if udpState != blackHoleStateAllowed {
return udpState
}
}

if d.ipv6 != nil && isProtocolAddr(addr, ma.P_IP6) {
ipv6State := d.ipv6.State()
if ipv6State != blackHoleStateAllowed {
return ipv6State
}
}
return blackHoleStateAllowed
}

// blackHoleConfig is the config used for black hole detection
type blackHoleConfig struct {
// Enabled enables black hole detection
Expand Down
12 changes: 12 additions & 0 deletions p2p/net/swarm/black_hole_detector_test.go
Expand Up @@ -17,6 +17,9 @@ func TestBlackHoleFilterReset(t *testing.T) {
if bhf.HandleRequest() != blackHoleResultProbing {
t.Fatalf("expected calls up to n to be probes")
}
if bhf.State() != blackHoleStateProbing {
t.Fatalf("expected state to be probing got %s", bhf.State())
}
bhf.RecordResult(false)
}

Expand All @@ -26,6 +29,9 @@ func TestBlackHoleFilterReset(t *testing.T) {
if (i%n == 0 && result != blackHoleResultProbing) || (i%n != 0 && result != blackHoleResultBlocked) {
t.Fatalf("expected every nth dial to be a probe")
}
if bhf.State() != blackHoleStateBlocked {
t.Fatalf("expected state to be blocked, got %s", bhf.State())
}
}

bhf.RecordResult(true)
Expand All @@ -34,12 +40,18 @@ func TestBlackHoleFilterReset(t *testing.T) {
if bhf.HandleRequest() != blackHoleResultProbing {
t.Fatalf("expected black hole detector state to reset after success")
}
if bhf.State() != blackHoleStateProbing {
t.Fatalf("expected state to be probing got %s", bhf.State())
}
bhf.RecordResult(false)
}

// next call should be blocked
if bhf.HandleRequest() != blackHoleResultBlocked {
t.Fatalf("expected dial to be blocked")
if bhf.State() != blackHoleStateBlocked {
t.Fatalf("expected state to be blocked, got %s", bhf.State())
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/swarm.go
Expand Up @@ -112,7 +112,7 @@ func WithDialRanker(d network.DialRanker) Option {
}
}

// WithUDPBlackHoleConfig configures swarm to use c as the config for UDP black hole detection
// WithUDPBlackHoleConfig configures swarm to use the provided config for UDP black hole detection
// n is the size of the sliding window used to evaluate black hole state
// min is the minimum number of successes out of n required to not block requests
func WithUDPBlackHoleConfig(enabled bool, n, min int) Option {
Expand All @@ -122,7 +122,7 @@ func WithUDPBlackHoleConfig(enabled bool, n, min int) Option {
}
}

// WithIPv6BlackHoleConfig configures swarm to use c as the config for IPv6 black hole detection
// WithIPv6BlackHoleConfig configures swarm to use the provided config for IPv6 black hole detection
// n is the size of the sliding window used to evaluate black hole state
// min is the minimum number of successes out of n required to not block requests
func WithIPv6BlackHoleConfig(enabled bool, n, min int) Option {
Expand Down
20 changes: 20 additions & 0 deletions p2p/net/swarm/swarm_dial.go
Expand Up @@ -416,6 +416,26 @@ func (s *Swarm) dialNextAddr(ctx context.Context, p peer.ID, addr ma.Multiaddr,
return nil
}

func (s *Swarm) CanDial(p peer.ID, addr ma.Multiaddr) network.Dialability {
dialable, _ := s.filterKnownUndialables(p, []ma.Multiaddr{addr})
if len(dialable) == 0 {
return network.DialabilityUndialable
}

bhState := s.bhd.State(addr)
switch bhState {
case blackHoleStateAllowed:
return network.DialabilityDialable
case blackHoleStateProbing:
return network.DialabilityUnknown
case blackHoleStateBlocked:
return network.DialabilityUndialable
default:
log.Errorf("SWARM BUG: unhandled black hole state for dilability check %s", bhState)
return network.DialabilityUnknown
}
}

func (s *Swarm) nonProxyAddr(addr ma.Multiaddr) bool {
t := s.TransportForDialing(addr)
return !t.Proxy()
Expand Down

0 comments on commit e080d99

Please sign in to comment.