Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm: use shared black hole filters for autonat #2561

Open
wants to merge 32 commits into
base: sukun/autonat-v2-2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
489b50a
autonatv2: add autonat initialised by host
sukunrt Aug 8, 2023
76b2a6c
add autonatv2 server
sukunrt Aug 8, 2023
de0b9ae
add client
sukunrt Aug 9, 2023
befc633
fix proto conflict
sukunrt Aug 10, 2023
5522bc8
network: add CanDial method to Dialer
sukunrt Aug 10, 2023
9b77869
add server
sukunrt Aug 10, 2023
bb84943
fix api
sukunrt Aug 10, 2023
ac47224
remove redundant test
sukunrt Aug 10, 2023
eafbb6d
add server rate limit
sukunrt Aug 12, 2023
f5f75b4
remove redundant msg.Reset
sukunrt Aug 14, 2023
8799dca
interim commit: stream timeout
sukunrt Aug 14, 2023
a03fca8
explain transition from v1 to v2
sukunrt Aug 14, 2023
2f8e97f
add comments and resolve nits
sukunrt Aug 14, 2023
12f9d2a
store map of peers supporting DialProtocol
sukunrt Aug 14, 2023
c462c09
send only a single dial status
sukunrt Aug 16, 2023
edbf707
improve errors and logging
sukunrt Aug 17, 2023
299bedf
add default server error code
sukunrt Aug 17, 2023
9c479e3
fix comment for reliable delivery
sukunrt Aug 18, 2023
7c9ee0f
improve map iteration order for peers
sukunrt Aug 18, 2023
6b21ff4
rename attempt to dial back
sukunrt Aug 18, 2023
45ed090
change result type to raw struct
sukunrt Aug 20, 2023
eef8f8e
rename proto package to pb
sukunrt Aug 21, 2023
0b1f4ec
add a Request struct
sukunrt Aug 21, 2023
70ed4a0
limit concurrent client requests
sukunrt Aug 21, 2023
367cbe1
fix comment
sukunrt Aug 21, 2023
d4187ef
differentiate between dial back error and dial error
sukunrt Aug 23, 2023
83babed
use the host black hole detector to check for dialability
sukunrt Aug 27, 2023
cfd4932
swarm: remove unnecessary blackHoleResult type
sukunrt Sep 7, 2023
8bb34cf
swarm: fix comments for black hole detection
sukunrt Sep 7, 2023
c226865
swarm: move metrics tracking to black hole detector
sukunrt Sep 7, 2023
fc19d4d
swarm: add read only mode to black hole detector
sukunrt Sep 7, 2023
6e5f729
swarm: use shared black hole filters for autonat
sukunrt Sep 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 68 additions & 34 deletions config/config.go
Expand Up @@ -128,6 +128,13 @@ type Config struct {
DialRanker network.DialRanker

SwarmOpts []swarm.Option

DisableAutoNATv2 bool

UDPBlackHoleFilter *swarm.BlackHoleFilter
CustomUDPBlackHoleFilter bool
IPv6BlackHoleFilter *swarm.BlackHoleFilter
CustomIPv6BlackHoleFilter bool
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -162,7 +169,10 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa
return nil, err
}

opts := cfg.SwarmOpts
opts := append(cfg.SwarmOpts,
swarm.WithUDPBlackHoleFilter(cfg.UDPBlackHoleFilter),
swarm.WithIPv6BlackHoleFilter(cfg.IPv6BlackHoleFilter),
)
if cfg.Reporter != nil {
opts = append(opts, swarm.WithMetrics(cfg.Reporter))
}
Expand Down Expand Up @@ -190,6 +200,50 @@ func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swa
return swarm.NewSwarm(pid, cfg.Peerstore, eventBus, opts...)
}

func (cfg *Config) makeAutoNATHost() (*blankhost.BlankHost, error) {
autonatPrivKey, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return nil, err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}

// Pull out the pieces of the config that we _actually_ care about.
// Specifically, don't set up things like autorelay, listeners,
// identify, etc.
autoNatCfg := Config{
Transports: cfg.Transports,
Muxers: cfg.Muxers,
SecurityTransports: cfg.SecurityTransports,
Insecure: cfg.Insecure,
PSK: cfg.PSK,
ConnectionGater: cfg.ConnectionGater,
Reporter: cfg.Reporter,
PeerKey: autonatPrivKey,
Peerstore: ps,
DialRanker: swarm.NoDelayDialRanker,
UDPBlackHoleFilter: cfg.UDPBlackHoleFilter,
IPv6BlackHoleFilter: cfg.IPv6BlackHoleFilter,
SwarmOpts: []swarm.Option{
// Don't update black hole state for failed autonat dials
swarm.WithReadOnlyBlackHoleDetector(),
},
}

dialer, err := autoNatCfg.makeSwarm(eventbus.NewBus(), false)
if err != nil {
return nil, err
}
dialerHost := blankhost.NewBlankHost(dialer)
if err := autoNatCfg.addTransports(dialerHost); err != nil {
dialerHost.Close()
return nil, err
}
return dialerHost, nil
}

func (cfg *Config) addTransports(h host.Host) error {
swrm, ok := h.Network().(transport.TransportNetwork)
if !ok {
Expand Down Expand Up @@ -305,6 +359,15 @@ func (cfg *Config) NewNode() (host.Host, error) {
rcmgr.MustRegisterWith(cfg.PrometheusRegisterer)
}

var autonatv2Dialer *blankhost.BlankHost
if !cfg.DisableAutoNATv2 {
ah, err := cfg.makeAutoNATHost()
if err != nil {
return nil, err
}
autonatv2Dialer = ah
}

h, err := bhost.NewHost(swrm, &bhost.HostOpts{
EventBus: eventBus,
ConnManager: cfg.ConnManager,
Expand All @@ -319,6 +382,8 @@ func (cfg *Config) NewNode() (host.Host, error) {
RelayServiceOpts: cfg.RelayServiceOpts,
EnableMetrics: !cfg.DisableMetrics,
PrometheusRegisterer: cfg.PrometheusRegisterer,
EnableAutoNATv2: !cfg.DisableAutoNATv2,
AutoNATv2Dialer: autonatv2Dialer,
})
if err != nil {
swrm.Close()
Expand Down Expand Up @@ -396,46 +461,15 @@ func (cfg *Config) NewNode() (host.Host, error) {
autonat.WithPeerThrottling(cfg.AutoNATConfig.ThrottlePeerLimit))
}
if cfg.AutoNATConfig.EnableService {
autonatPrivKey, _, err := crypto.GenerateEd25519Key(rand.Reader)
ah, err := cfg.makeAutoNATHost()
if err != nil {
return nil, err
}
ps, err := pstoremem.NewPeerstore()
if err != nil {
return nil, err
}

// Pull out the pieces of the config that we _actually_ care about.
// Specifically, don't set up things like autorelay, listeners,
// identify, etc.
autoNatCfg := Config{
Transports: cfg.Transports,
Muxers: cfg.Muxers,
SecurityTransports: cfg.SecurityTransports,
Insecure: cfg.Insecure,
PSK: cfg.PSK,
ConnectionGater: cfg.ConnectionGater,
Reporter: cfg.Reporter,
PeerKey: autonatPrivKey,
Peerstore: ps,
DialRanker: swarm.NoDelayDialRanker,
}

dialer, err := autoNatCfg.makeSwarm(eventbus.NewBus(), false)
if err != nil {
h.Close()
return nil, err
}
dialerHost := blankhost.NewBlankHost(dialer)
if err := autoNatCfg.addTransports(dialerHost); err != nil {
dialerHost.Close()
h.Close()
return nil, err
}
// NOTE: We're dropping the blank host here but that's fine. It
// doesn't really _do_ anything and doesn't even need to be
// closed (as long as we close the underlying network).
autonatOpts = append(autonatOpts, autonat.EnableService(dialerHost.Network()))
autonatOpts = append(autonatOpts, autonat.EnableService(ah.Network()))
}
if cfg.AutoNATConfig.ForceReachability != nil {
autonatOpts = append(autonatOpts, autonat.WithReachability(*cfg.AutoNATConfig.ForceReachability))
Expand Down
3 changes: 3 additions & 0 deletions core/network/network.go
Expand Up @@ -185,6 +185,9 @@ type Dialer interface {
// Notify/StopNotify register and unregister a notifiee for signals
Notify(Notifiee)
StopNotify(Notifiee)

// CanDial returns whether the dialer can dial peer p at addr
CanDial(p peer.ID, addr ma.Multiaddr) bool
}

// AddrDelay provides an address along with the delay after which the address
Expand Down
25 changes: 25 additions & 0 deletions defaults.go
Expand Up @@ -10,6 +10,7 @@ import (
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/net/swarm"
"github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
quic "github.com/libp2p/go-libp2p/p2p/transport/quic"
Expand Down Expand Up @@ -133,6 +134,18 @@ var DefaultPrometheusRegisterer = func(cfg *Config) error {
return cfg.Apply(PrometheusRegisterer(prometheus.DefaultRegisterer))
}

var defaultUDPBlackHoleDetector = func(cfg *Config) error {
// A black hole is a binary property. On a network if UDP dials are blocked, all dials will
// fail. So a low success rate of 5 out 100 dials is good enough.
return cfg.Apply(UDPBlackHoleFilter(&swarm.BlackHoleFilter{N: 100, MinSuccesses: 5, Name: "UDP"}))
}

var defaultIPv6BlackHoleDetector = func(cfg *Config) error {
// A black hole is a binary property. On a network if there is no IPv6 connectivity, all
// dials will fail. So a low success rate of 5 out 100 dials is good enough.
return cfg.Apply(IPv6BlackHoleFilter(&swarm.BlackHoleFilter{N: 100, MinSuccesses: 5, Name: "IPv6"}))
}

// Complete list of default options and when to fallback on them.
//
// Please *DON'T* specify default options any other way. Putting this all here
Expand Down Expand Up @@ -189,6 +202,18 @@ var defaults = []struct {
fallback: func(cfg *Config) bool { return !cfg.DisableMetrics && cfg.PrometheusRegisterer == nil },
opt: DefaultPrometheusRegisterer,
},
{
fallback: func(cfg *Config) bool {
return !cfg.CustomUDPBlackHoleFilter && cfg.UDPBlackHoleFilter == nil
},
opt: defaultUDPBlackHoleDetector,
},
{
fallback: func(cfg *Config) bool {
return !cfg.CustomIPv6BlackHoleFilter && cfg.IPv6BlackHoleFilter == nil
},
opt: defaultIPv6BlackHoleDetector,
},
}

// Defaults configures libp2p to use the default options. Can be combined with
Expand Down
26 changes: 26 additions & 0 deletions options.go
Expand Up @@ -597,3 +597,29 @@ func SwarmOpts(opts ...swarm.Option) Option {
return nil
}
}

// DisableAutoNATv2 disables autonat
func DisableAutoNATv2() Option {
return func(cfg *Config) error {
cfg.DisableAutoNATv2 = true
return nil
}
}

// UDPBlackHoleFilter configures libp2p to use f as the black hole filter for UDP addrs
func UDPBlackHoleFilter(f *swarm.BlackHoleFilter) Option {
return func(cfg *Config) error {
cfg.UDPBlackHoleFilter = f
cfg.CustomUDPBlackHoleFilter = true
return nil
}
}

// IPv6BlackHoleFilter configures libp2p to use f as the black hole filter for IPv6 addrs
func IPv6BlackHoleFilter(f *swarm.BlackHoleFilter) Option {
return func(cfg *Config) error {
cfg.IPv6BlackHoleFilter = f
cfg.CustomIPv6BlackHoleFilter = true
return nil
}
}
16 changes: 16 additions & 0 deletions p2p/host/basic/basic_host.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/pstoremanager"
"github.com/libp2p/go-libp2p/p2p/host/relaysvc"
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
Expand Down Expand Up @@ -101,6 +102,8 @@ type BasicHost struct {
caBook peerstore.CertifiedAddrBook

autoNat autonat.AutoNAT

autonatv2 *autonatv2.AutoNAT
}

var _ host.Host = (*BasicHost)(nil)
Expand Down Expand Up @@ -160,6 +163,9 @@ type HostOpts struct {
EnableMetrics bool
// PrometheusRegisterer is the PrometheusRegisterer used for metrics
PrometheusRegisterer prometheus.Registerer

EnableAutoNATv2 bool
AutoNATv2Dialer host.Host
}

// NewHost constructs a new *BasicHost and activates it by attaching its stream and connection handlers to the given inet.Network.
Expand Down Expand Up @@ -301,6 +307,13 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
h.pings = ping.NewPingService(h)
}

if opts.EnableAutoNATv2 {
h.autonatv2, err = autonatv2.New(h, opts.AutoNATv2Dialer)
if err != nil {
return nil, fmt.Errorf("failed to create autonatv2: %w", err)
}
}

n.SetStreamHandler(h.newStreamHandler)

// register to be notified when the network's listen addrs change,
Expand Down Expand Up @@ -1025,6 +1038,9 @@ func (h *BasicHost) Close() error {
if h.hps != nil {
h.hps.Close()
}
if h.autonatv2 != nil {
h.autonatv2.Close()
}

_ = h.emitters.evtLocalProtocolsUpdated.Close()
_ = h.emitters.evtLocalAddrsUpdated.Close()
Expand Down
7 changes: 4 additions & 3 deletions p2p/host/blank/blank.go
Expand Up @@ -63,9 +63,10 @@ func NewBlankHost(n network.Network, options ...Option) *BlankHost {
}

bh := &BlankHost{
n: n,
cmgr: cfg.cmgr,
mux: mstream.NewMultistreamMuxer[protocol.ID](),
n: n,
cmgr: cfg.cmgr,
mux: mstream.NewMultistreamMuxer[protocol.ID](),
eventbus: cfg.eventBus,
}
if bh.eventbus == nil {
bh.eventbus = eventbus.NewBus(eventbus.WithMetricsTracer(eventbus.NewMetricsTracer()))
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/mock/mock_peernet.go
Expand Up @@ -434,3 +434,7 @@ func (pn *peernet) notifyAll(notification func(f network.Notifiee)) {
func (pn *peernet) ResourceManager() network.ResourceManager {
return &network.NullResourceManager{}
}

func (pn *peernet) CanDial(p peer.ID, addr ma.Multiaddr) bool {
return true
}