Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webrtcprivate: add transport #2576

Open
wants to merge 22 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
3a54181
webrtcprivate: add transport
sukunrt Sep 20, 2023
8f84284
webrtcprivate: fix deadline, limit inflight connection requests
sukunrt Sep 21, 2023
4f3c12e
webrtcprivate: integrate connection gater
sukunrt Sep 22, 2023
5ab60e2
webrtcprivate: factor establishConnection out in listener
sukunrt Sep 22, 2023
10624d1
webrtcprivate: setup addresses on connection properly
sukunrt Sep 22, 2023
0b20ffe
webrtcprivate: set negotiated=true for the init data channel
sukunrt Sep 22, 2023
072b38a
webrtcprivate: don't test for address equality
sukunrt Sep 23, 2023
8bf790f
webrtcprivate: setup incoming data channel handlers early
sukunrt Sep 23, 2023
4622ab2
webrtcprivate: fix comments
sukunrt Sep 24, 2023
1448286
webrtcprivate: add rcmgr reservation on listen side
sukunrt Sep 24, 2023
6e464e5
webrtcprivate: setup conn.ConnState correctly
sukunrt Sep 24, 2023
8baee51
webrtcprivate: reuse protobuf msg structs
sukunrt Sep 24, 2023
255fd3e
webrtcprivate: use context for closing listener
sukunrt Sep 24, 2023
f41c97c
webrtcprivate: fix Multiple Dialers test to use listeners for dialing
sukunrt Sep 24, 2023
a2e3df3
swarm: integrate webrtc dialing
sukunrt Sep 25, 2023
212159b
libp2p: provide option for enabling webrtcprivate
sukunrt Sep 25, 2023
7e077d3
libp2p: use webrtc.iceserver for configuration
sukunrt Sep 26, 2023
13081dd
holepunch: trigger holepunching for peers with webrtc addresses
sukunrt Oct 2, 2023
05f5d33
webrtcprivate: integrate transport integration tests
sukunrt Oct 3, 2023
4a77bae
webrtcprivate: fix bug with gater intercept secured
sukunrt Oct 10, 2023
6249772
webrtcprivate: fix rcmgr bugs on listener
sukunrt Oct 12, 2023
bc90626
webrtcprivate: advertise /webrtc addresses
sukunrt Oct 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions config/config.go
Expand Up @@ -34,6 +34,8 @@ import (
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/transport/quicreuse"
libp2pwebrtcprivate "github.com/libp2p/go-libp2p/p2p/transport/webrtcprivate"
"github.com/pion/webrtc/v3"
"github.com/prometheus/client_golang/prometheus"

ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -65,6 +67,8 @@ type Security struct {
Constructor interface{}
}

type ICEServer = webrtc.ICEServer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to redeclare this? Is it a requirement for Fx?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Just so that we redefine it a different way later if we want. But I think it's fine to remove this and just take the pion specific object. We can make a breaking change if this needs to be changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s a breaking change anyway if we change the underlying type. That should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove it?


// Config describes a set of settings for a libp2p node
//
// This is *not* a stable interface. Use the options defined in the root
Expand Down Expand Up @@ -128,6 +132,9 @@ type Config struct {
DialRanker network.DialRanker

SwarmOpts []swarm.Option

WebRTCPrivate bool
WebRTCStunServers []ICEServer
}

func (cfg *Config) makeSwarm(eventBus event.Bus, enableMetrics bool) (*swarm.Swarm, error) {
Expand Down Expand Up @@ -208,6 +215,7 @@ func (cfg *Config) addTransports(h host.Host) error {
fx.Provide(func() pnet.PSK { return cfg.PSK }),
fx.Provide(func() network.ResourceManager { return cfg.ResourceManager }),
fx.Provide(func() *madns.Resolver { return cfg.MultiaddrResolver }),
fx.Provide(func() []ICEServer { return cfg.WebRTCStunServers }),
}
fxopts = append(fxopts, cfg.Transports...)
if cfg.Insecure {
Expand Down Expand Up @@ -284,6 +292,9 @@ func (cfg *Config) addTransports(h host.Host) error {
if cfg.Relay {
fxopts = append(fxopts, fx.Invoke(circuitv2.AddTransport))
}
if cfg.WebRTCPrivate {
fxopts = append(fxopts, fx.Invoke(libp2pwebrtcprivate.AddTransport))
}
app := fx.New(fxopts...)
if err := app.Err(); err != nil {
h.Close()
Expand Down
7 changes: 7 additions & 0 deletions core/network/context.go
Expand Up @@ -29,6 +29,13 @@ func WithForceDirectDial(ctx context.Context, reason string) context.Context {
return context.WithValue(ctx, forceDirectDial, reason)
}

// WithoutForceDirectDial constructs a new context with the ForceDirectDial option dropped.
// This is useful in case establishing a direct connection first requires establishing a
// relayed connection e.g. dialing /webrtc addresses.
func WithoutForceDirectDial(ctx context.Context) context.Context {
return context.WithValue(ctx, forceDirectDial, nil)
}

// EXPERIMENTAL
// GetForceDirectDial returns true if the force direct dial option is set in the context.
func GetForceDirectDial(ctx context.Context) (forceDirect bool, reason string) {
Expand Down
8 changes: 8 additions & 0 deletions options.go
Expand Up @@ -598,3 +598,11 @@ func SwarmOpts(opts ...swarm.Option) Option {
return nil
}
}

func EnableWebRTCPrivate(stunServers []config.ICEServer) Option {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this a separate option? We already have libp2p.Transport to configure transports. I had hoped that we don't need to introduce a separate knob here.

return func(cfg *Config) error {
cfg.WebRTCPrivate = true
cfg.WebRTCStunServers = stunServers
return nil
}
}
2 changes: 1 addition & 1 deletion p2p/host/autorelay/relay_finder.go
Expand Up @@ -726,7 +726,7 @@ func (rf *relayFinder) relayAddrs(addrs []ma.Multiaddr) []ma.Multiaddr {

// only keep private addrs from the original addr set
for _, addr := range addrs {
if manet.IsPrivateAddr(addr) {
if !manet.IsPublicAddr(addr) {
raddrs = append(raddrs, addr)
}
}
Expand Down
30 changes: 30 additions & 0 deletions p2p/host/basic/basic_host.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
libp2pwebrtcprivate "github.com/libp2p/go-libp2p/p2p/transport/webrtcprivate"
libp2pwebtransport "github.com/libp2p/go-libp2p/p2p/transport/webtransport"
"github.com/prometheus/client_golang/prometheus"

Expand Down Expand Up @@ -801,9 +802,38 @@ func (h *BasicHost) Addrs() []ma.Multiaddr {
addrs[i] = addrWithCerthash
}
}

// Append webrtc addresses to circuit-v2 addresses
hasWebRTCPrivate := false
for _, addr := range addrs {
if addr.Equal(libp2pwebrtcprivate.WebRTCAddr) {
hasWebRTCPrivate = true
break
}
}
if hasWebRTCPrivate {
for _, addr := range addrs {
if _, err := addr.ValueForProtocol(ma.P_CIRCUIT); err == nil {
if isBrowserDialableAddr(addr) {
addrs = append(addrs, addr.Encapsulate(libp2pwebrtcprivate.WebRTCAddr))
}
}
}
}
return addrs
}

var browserProtocols = []int{ma.P_WEBTRANSPORT, ma.P_WEBRTC_DIRECT, ma.P_WSS}

func isBrowserDialableAddr(addr ma.Multiaddr) bool {
for _, p := range browserProtocols {
if _, err := addr.ValueForProtocol(p); err == nil {
return true
}
}
return false
}

// NormalizeMultiaddr returns a multiaddr suitable for equality checks.
// If the multiaddr is a webtransport component, it removes the certhashes.
func (h *BasicHost) NormalizeMultiaddr(addr ma.Multiaddr) ma.Multiaddr {
Expand Down
6 changes: 5 additions & 1 deletion p2p/net/swarm/dial_ranker.go
Expand Up @@ -43,7 +43,10 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
// no additional latency in the vast majority of cases.
//
// Private and public address groups are dialed in parallel.
//
// Dialing relay addresses is delayed by 500 ms, if we have any non-relay alternatives.
// We treat webrtc addresses the same as relay addresses as we need a relay connection to establish a
// webrtc connection. So any available direct addresses are preferred over webrtc addresses.
//
// Within each group (private, public, relay addresses) we apply the following ranking logic:
//
Expand Down Expand Up @@ -72,7 +75,8 @@ func NoDelayDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
//
// We dial lowest ports first as they are more likely to be the listen port.
func DefaultDialRanker(addrs []ma.Multiaddr) []network.AddrDelay {
relay, addrs := filterAddrs(addrs, isRelayAddr)
// includes /webrtc addresses too
relay, addrs := filterAddrs(addrs, func(a ma.Multiaddr) bool { return isProtocolAddr(a, ma.P_CIRCUIT) })
pvt, addrs := filterAddrs(addrs, manet.IsPrivateAddr)
public, addrs := filterAddrs(addrs, func(a ma.Multiaddr) bool { return isProtocolAddr(a, ma.P_IP4) || isProtocolAddr(a, ma.P_IP6) })

Expand Down
5 changes: 0 additions & 5 deletions p2p/net/swarm/swarm_dial.go
Expand Up @@ -597,11 +597,6 @@ func isFdConsumingAddr(addr ma.Multiaddr) bool {
return err1 == nil || err2 == nil
}

func isRelayAddr(addr ma.Multiaddr) bool {
_, err := addr.ValueForProtocol(ma.P_CIRCUIT)
return err == nil
}

// filterLowPriorityAddresses removes addresses inplace for which we have a better alternative
// 1. If a /quic-v1 address is present, filter out /quic and /webtransport address on the same 2-tuple:
// QUIC v1 is preferred over the deprecated QUIC draft-29, and given the choice, we prefer using
Expand Down
5 changes: 4 additions & 1 deletion p2p/net/swarm/swarm_transport.go
Expand Up @@ -27,7 +27,10 @@ func (s *Swarm) TransportForDialing(a ma.Multiaddr) transport.Transport {
}
return nil
}
if isRelayAddr(a) {
if isProtocolAddr(a, ma.P_WEBRTC) {
return s.transports.m[ma.P_WEBRTC]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this require extra logic here? Is it because of the certhash? Why doesn't it apply to WebTransport then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The peer address is a circuitv2 address. So we need to check it before the circuitv2 check but the check is the same as the circuit v2 check.

The address is of the form
/ip4/1.2.3.4/udp/1/quic-v1/p2p/<relay-id>/p2p-circuit/webrtc
So we need to ensure that we don't use the quic-v1 or the p2p-circuit transport for dialing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't isRelayAddr already do that? Or phrased differently, the same problem would have occurred for a non-WebRTC address, why do we need special logic for WebRTC now?

Copy link
Member Author

@sukunrt sukunrt Oct 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but we don't want the circuit-v2 transport to handle dialing to the webrtc address.
The previous code was

if isRelayAddr => Use circuitv2 transport. 

But now we need

if isWebRTCAddr => Use webrtcprivate transport.
else if isRelayAddr => Use circuitv2 transport. 

This is because the webrtcprivate address has a p2p-circuit component but we want to use webrtcprivate transport. I could have written this as:

if isRelayAddr(addr):
	if isWebRTCAddr(addr):
		use webrtcprivate transport
	else
		use circuitv2 transport

The flow I've implemented is:
peerAddr: <relay-addr>/p2p-circuit/webrtc

  1. webrtcprivate.Transport.Dial is called with peerAddr
  2. webrtcprivate.Transport.Dial adds the relay address inferred from the webrtcprivate address inferred from the peerstore. Then makes a host.Connect call to establish a relay connection with the peer.
  3. Once relay connection is established it opens a signaling stream and establishes a webrtc connection.

I've chosen this because it is similar to circuitv2 transport which first establishes a direct connection with the relay node and then a circuitv2 connection to the peer over the relayed connection. This also has the nice property that Transport.Dial can connect to the peer over the address no matter the state of the host when Transport.Dial is called. In this case, it means if no circuit-v2 connection exists, it'll make a circuit-v2 connection first.

}
if isProtocolAddr(a, ma.P_CIRCUIT) {
return s.transports.m[ma.P_CIRCUIT]
}
for _, t := range s.transports.m {
Expand Down
55 changes: 55 additions & 0 deletions p2p/protocol/holepunch/holepunch_test.go
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/proto"
relayv2 "github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/libp2p/go-libp2p/p2p/protocol/holepunch"
Expand Down Expand Up @@ -511,3 +512,57 @@ func mkHostWithHolePunchSvc(t *testing.T, opts ...holepunch.Option) (host.Host,
require.NoError(t, err)
return h, hps
}

func TestWebRTCDirectConnect(t *testing.T) {
relay1, err := libp2p.New()
require.NoError(t, err)

_, err = relayv2.New(relay1)
require.NoError(t, err)

relay1info := peer.AddrInfo{
ID: relay1.ID(),
Addrs: relay1.Addrs(),
}

h1, err := libp2p.New(
libp2p.NoListenAddrs,
libp2p.EnableRelay(),
libp2p.EnableWebRTCPrivate(nil),
libp2p.EnableHolePunching(),
)
require.NoError(t, err)

h2, err := libp2p.New(
libp2p.NoListenAddrs,
libp2p.EnableRelay(),
libp2p.EnableWebRTCPrivate(nil),
)
require.NoError(t, err)

err = h2.Connect(context.Background(), relay1info)
require.NoError(t, err)

_, err = client.Reserve(context.Background(), h2, relay1info)
require.NoError(t, err)

webrtcAddr := ma.StringCast(relay1info.Addrs[0].String() + "/p2p/" + relay1info.ID.String() + "/p2p-circuit/webrtc")
relayAddrs := ma.StringCast(relay1info.Addrs[0].String() + "/p2p/" + relay1info.ID.String() + "/p2p-circuit/")
h1.Peerstore().AddAddrs(h2.ID(), []ma.Multiaddr{webrtcAddr, relayAddrs}, peerstore.TempAddrTTL)

err = h1.Connect(context.Background(), peer.AddrInfo{ID: h2.ID()})
require.NoError(t, err)
require.Eventually(
t,
func() bool {
for _, c := range h1.Network().ConnsToPeer(h2.ID()) {
if !c.Stat().Transient {
return true
}
}
return false
},
5*time.Second,
100*time.Millisecond,
)
}
18 changes: 18 additions & 0 deletions p2p/protocol/holepunch/holepuncher.go
Expand Up @@ -108,6 +108,9 @@ func (hp *holePuncher) directConnect(rp peer.ID) error {
// short-circuit hole punching if a direct dial works.
// attempt a direct connection ONLY if we have a public address for the remote peer
for _, a := range hp.host.Peerstore().Addrs(rp) {
// Here we consider /webrtc addresses as relay addresses and skip them as they're
// also holepunched. We will dial the /webrtc addresses along with other addresses
// obtained in DCUtR
if manet.IsPublicAddr(a) && !isRelayAddress(a) {
forceDirectConnCtx := network.WithForceDirectDial(hp.ctx, "hole-punching")
dialCtx, cancel := context.WithTimeout(forceDirectConnCtx, dialTimeout)
Expand Down Expand Up @@ -136,6 +139,7 @@ func (hp *holePuncher) directConnect(rp peer.ID) error {
if err != nil {
log.Debugw("hole punching failed", "peer", rp, "error", err)
hp.tracer.ProtocolError(rp, err)
hp.maybeDialWebRTC(rp)
return err
}
synTime := rtt / 2
Expand Down Expand Up @@ -171,6 +175,20 @@ func (hp *holePuncher) directConnect(rp peer.ID) error {
return fmt.Errorf("all retries for hole punch with peer %s failed", rp)
}

func (hp *holePuncher) maybeDialWebRTC(p peer.ID) {
addrs := hp.host.Peerstore().Addrs(p)
for _, a := range addrs {
if _, err := a.ValueForProtocol(ma.P_WEBRTC); err == nil {
ctx := network.WithForceDirectDial(hp.ctx, "webrtc holepunch")
err := hp.host.Connect(ctx, peer.AddrInfo{ID: p}) // address is already in peerstore
if err != nil {
log.Debugf("holepunch attempt to %s over /webrtc failed: %s", p, err)
}
return
}
}
}

// initiateHolePunch opens a new hole punching coordination stream,
// exchanges the addresses and measures the RTT.
func (hp *holePuncher) initiateHolePunch(rp peer.ID) ([]ma.Multiaddr, []ma.Multiaddr, time.Duration, error) {
Expand Down
41 changes: 41 additions & 0 deletions p2p/protocol/holepunch/svc.go
Expand Up @@ -84,6 +84,8 @@ func NewService(h host.Host, ids identify.IDService, opts ...Option) (*Service,
return nil, err
}
}
s.host.Network().Notify(s)

s.tracer.Start()

s.refCount.Add(1)
Expand Down Expand Up @@ -283,3 +285,42 @@ func (s *Service) DirectConnect(p peer.ID) error {
s.holePuncherMx.Unlock()
return holePuncher.DirectConnect(p)
}

var _ network.Notifiee = &Service{}

func (s *Service) Connected(_ network.Network, conn network.Conn) {
// Dial /webrtc address if it's a relay connection to a browser node
if conn.Stat().Direction == network.DirOutbound && conn.Stat().Transient {
s.refCount.Add(1)
go func() {
defer s.refCount.Done()
select {
// waiting for Identify here will allow us to access the peer's public and observed addresses
// that we can dial to for a hole punch.
case <-s.ids.IdentifyWait(conn):
case <-s.ctx.Done():
return
}
p := conn.RemotePeer()
// Peer supports DCUtR, let it trigger holepunch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the peer supports DCUtR, but there are only WebRTC addresses?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That case is handled here: https://github.com/libp2p/go-libp2p/blob/webrtcprivate/transport/p2p/protocol/holepunch/holepuncher.go#L142

In this case the peer(DCUtR initiator) is best suited to handle the call so I've kept it on that side.

if protos, err := s.host.Peerstore().SupportsProtocols(p, Protocol); err == nil && len(protos) > 0 {
return
}
// No DCUtR support, connect with peer over /webrtc
for _, addr := range s.host.Peerstore().Addrs(p) {
if _, err := addr.ValueForProtocol(ma.P_WEBRTC); err == nil {
ctx := network.WithForceDirectDial(s.ctx, "webrtc holepunch")
err := s.host.Connect(ctx, peer.AddrInfo{ID: p}) // address is already in peerstore
if err != nil {
log.Debugf("holepunch attempt to %s over /webrtc failed: %s", p, err)
}
return
}
}
}()
}
}

func (*Service) Disconnected(_ network.Network, v network.Conn) {}
func (*Service) Listen(n network.Network, a ma.Multiaddr) {}
func (*Service) ListenClose(n network.Network, a ma.Multiaddr) {}
41 changes: 41 additions & 0 deletions p2p/test/basichost/basic_host_test.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/p2p/host/autorelay"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -158,3 +159,43 @@ func TestNewStreamTransientConnection(t *testing.T) {
<-done
<-done
}

func TestWebRTCPrivateAddressAdvertisement(t *testing.T) {
r, err := libp2p.New(
// We need a public address for the relay
libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
return append(addrs, ma.StringCast("/ip4/1.2.3.4/udp/1/quic-v1/webtransport"))
}),
libp2p.EnableRelayService(),
libp2p.ForceReachabilityPublic(),
)
require.NoError(t, err)

relay1info := peer.AddrInfo{
ID: r.ID(),
Addrs: r.Addrs(),
}

h, err := libp2p.New(
libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1"),
libp2p.EnableRelay(),
libp2p.EnableWebRTCPrivate(nil),
libp2p.EnableAutoRelayWithStaticRelays(
[]peer.AddrInfo{relay1info},
autorelay.WithBootDelay(0),
),
libp2p.ForceReachabilityPrivate(),
)
require.NoError(t, err)

require.Eventually(t, func() bool {
for _, a := range h.Addrs() {
_, rerr := a.ValueForProtocol(ma.P_CIRCUIT)
_, werr := a.ValueForProtocol(ma.P_WEBRTC)
if rerr == nil && werr == nil {
return true
}
}
return false
}, 5*time.Second, 50*time.Millisecond)
}