Skip to content

Commit

Permalink
magiselect
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Mar 17, 2024
1 parent 164adb4 commit 239e7ce
Show file tree
Hide file tree
Showing 17 changed files with 418 additions and 43 deletions.
15 changes: 15 additions & 0 deletions core/sec/security.go
Expand Up @@ -9,6 +9,9 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/transport/magiselect"
"github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
)

// SecureConn is an authenticated, encrypted connection.
Expand All @@ -31,6 +34,18 @@ type SecureTransport interface {
ID() protocol.ID
}

// StraightableSecureTransport can be implemented by security transports which support being ran straight on the stream.
// This allows them to skip the multistream security handshake.
type StraightableSecureTransport interface {
SecureTransport

// Suffix indicate the trailing component which allows to skip the multistream select exchange.
Suffix() multiaddr.Multiaddr
SuffixProtocol() int
SuffixMatcher() mafmt.Pattern
magiselect.Matcher
}

type ErrPeerIDMismatch struct {
Expected peer.ID
Actual peer.ID
Expand Down
24 changes: 21 additions & 3 deletions core/transport/transport.go
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"

ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
manet "github.com/multiformats/go-multiaddr/net"
)

Expand Down Expand Up @@ -96,6 +97,14 @@ type Listener interface {
Multiaddr() ma.Multiaddr
}

// ListenerFromUpgrader is a workaround to let the swarm append suffixes, it is optionally implemented by Listeners.
// FIXME: I want 8962b2ae336d94627f2f4361f96799ee3a5bd9e4 but it was reverted in 1c8eaabfd385346a7c41b988e2dbc2e20ddfa460 and I'm not in the mood to figure out this kind of mess.
type ListenerFromUpgrader interface {
Listener

Upgrader() Upgrader
}

// ErrListenerClosed is returned by Listener.Accept when the listener is gracefully closed.
var ErrListenerClosed = errors.New("listener closed")

Expand All @@ -121,9 +130,18 @@ type TransportNetwork interface {
// to a full transport connection (secure and multiplexed).
type Upgrader interface {
// UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
UpgradeListener(Transport, manet.Listener) Listener
// Upgrade upgrades the multiaddr/net connection into a full libp2p-transport connection.
Upgrade(ctx context.Context, t Transport, maconn manet.Conn, dir network.Direction, p peer.ID, scope network.ConnManagementScope) (CapableConn, error)
UpgradeListener(Transport, manet.Listener) ListenerFromUpgrader

// UpgradeOutbound/Inbound upgrades the multiaddr/net connection into a full libp2p-transport connection.
// suffix can be nil if no suffix is present.
UpgradeOutbound(ctx context.Context, t Transport, maconn manet.Conn, suffix ma.Multiaddr, p peer.ID, scope network.ConnManagementScope) (CapableConn, error)
UpgradeInbound(ctx context.Context, t Transport, maconn manet.Conn, p peer.ID, scope network.ConnManagementScope) (CapableConn, error)

// Suffixes let the Upgrader indicate optional maddr suffixes which can be used to skip parts of the negociation.
// A nil maddr indicate that no suffix must be applied (multistream-select will be used).
Suffixes() []ma.Multiaddr
SuffixesProtocols() []int
SuffixMatcher() mafmt.Pattern
}

// DialUpdater provides updates on in progress dials.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -36,7 +36,7 @@ require (
github.com/multiformats/go-base32 v0.1.0
github.com/multiformats/go-multiaddr v0.12.2
github.com/multiformats/go-multiaddr-dns v0.3.1
github.com/multiformats/go-multiaddr-fmt v0.1.0
github.com/multiformats/go-multiaddr-fmt v0.1.1-0.20240317044316-238036a861f0
github.com/multiformats/go-multibase v0.2.0
github.com/multiformats/go-multicodec v0.9.0
github.com/multiformats/go-multihash v0.2.3
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Expand Up @@ -245,14 +245,13 @@ github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aG
github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI=
github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0=
github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4=
github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo=
github.com/multiformats/go-multiaddr v0.2.0/go.mod h1:0nO36NvPpyV4QzvTLi/lafl2y95ncPj0vFwVF6k6wJ4=
github.com/multiformats/go-multiaddr v0.12.2 h1:9G9sTY/wCYajKa9lyfWPmpZAwe6oV+Wb1zcmMS1HG24=
github.com/multiformats/go-multiaddr v0.12.2/go.mod h1:GKyaTYjZRdcUhyOetrxTk9z0cW+jA/YrnqTOvKgi44M=
github.com/multiformats/go-multiaddr-dns v0.3.1 h1:QgQgR+LQVt3NPTjbrLLpsaT2ufAA2y0Mkk+QRVJbW3A=
github.com/multiformats/go-multiaddr-dns v0.3.1/go.mod h1:G/245BRQ6FJGmryJCrOuTdB37AMA5AMOVuO6NY3JwTk=
github.com/multiformats/go-multiaddr-fmt v0.1.0 h1:WLEFClPycPkp4fnIzoFoV9FVd49/eQsuaL3/CWe167E=
github.com/multiformats/go-multiaddr-fmt v0.1.0/go.mod h1:hGtDIW4PU4BqJ50gW2quDuPVjyWNZxToGUh/HwTZYJo=
github.com/multiformats/go-multiaddr-fmt v0.1.1-0.20240317044316-238036a861f0 h1:KCd9ZVLosRwAYaZfRk/BeT9dBkTBIJpG/GwMI0sE3hk=
github.com/multiformats/go-multiaddr-fmt v0.1.1-0.20240317044316-238036a861f0/go.mod h1:fU4KrUT9EeccvWYMheDXrJlEgjr38O1V7sEyshNz3BI=
github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g=
github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk=
github.com/multiformats/go-multicodec v0.9.0 h1:pb/dlPnzee/Sxv/j4PmkDRxCOi3hXTz3IbPKOXWJkmg=
Expand Down
18 changes: 17 additions & 1 deletion p2p/net/swarm/swarm_addr.go
Expand Up @@ -3,6 +3,7 @@ package swarm
import (
"time"

"github.com/libp2p/go-libp2p/core/transport"
manet "github.com/multiformats/go-multiaddr/net"

ma "github.com/multiformats/go-multiaddr"
Expand All @@ -18,7 +19,22 @@ func (s *Swarm) ListenAddresses() []ma.Multiaddr {
func (s *Swarm) listenAddressesNoLock() []ma.Multiaddr {
addrs := make([]ma.Multiaddr, 0, len(s.listeners.m)+10) // A bit extra so we may avoid an extra allocation in the for loop below.
for l := range s.listeners.m {
addrs = append(addrs, l.Multiaddr())
addr := l.Multiaddr()

// FIXME: this is a hack because we don't return multimple addresses from .Multiaddr, see the docs of transport.ListenerFromUpgrader.
if lu, ok := l.(transport.ListenerFromUpgrader); ok {
u := lu.Upgrader()
for _, suffix := range u.Suffixes() {
if suffix == nil {
// implicit multistream-select
addrs = append(addrs, addr)
} else {
addrs = append(addrs, addr.Encapsulate(suffix))
}
}
} else {
addrs = append(addrs, addr)
}
}
return addrs
}
Expand Down
7 changes: 6 additions & 1 deletion p2p/net/upgrader/listener.go
Expand Up @@ -119,7 +119,7 @@ func (l *listener) handleIncoming() {
ctx, cancel := context.WithTimeout(l.ctx, l.upgrader.acceptTimeout)
defer cancel()

conn, err := l.upgrader.Upgrade(ctx, l.transport, maconn, network.DirInbound, "", connScope)
conn, err := l.upgrader.UpgradeInbound(ctx, l.transport, maconn, "", connScope)
if err != nil {
// Don't bother bubbling this up. We just failed
// to completely negotiate the connection.
Expand Down Expand Up @@ -179,4 +179,9 @@ func (l *listener) String() string {
return fmt.Sprintf("<stream.Listener %s>", l.Multiaddr())
}

func (l *listener) Upgrader() transport.Upgrader {
return l.upgrader
}

var _ transport.Listener = (*listener)(nil)
var _ transport.ListenerFromUpgrader = (*listener)(nil)
113 changes: 92 additions & 21 deletions p2p/net/upgrader/upgrader.go
Expand Up @@ -15,7 +15,10 @@ import (
"github.com/libp2p/go-libp2p/core/sec"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-libp2p/p2p/net/pnet"
"github.com/libp2p/go-libp2p/p2p/transport/magiselect"

ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
manet "github.com/multiformats/go-multiaddr/net"
mss "github.com/multiformats/go-multistream"
)
Expand Down Expand Up @@ -61,6 +64,9 @@ type upgrader struct {
securityMuxer *mss.MultistreamMuxer[protocol.ID]
securityIDs []protocol.ID

straightSecurity []sec.StraightableSecureTransport
straightMatcher mafmt.Pattern

// AcceptTimeout is the maximum duration an Accept is allowed to take.
// This includes the time between accepting the raw network connection,
// protocol selection as well as the handshake, if applicable.
Expand Down Expand Up @@ -96,15 +102,25 @@ func New(security []sec.SecureTransport, muxers []StreamMuxer, psk ipnet.PSK, rc
u.muxerIDs = append(u.muxerIDs, m.ID)
}
u.securityIDs = make([]protocol.ID, 0, len(security))
u.straightSecurity = make([]sec.StraightableSecureTransport, 0, len(security))
suffixes := []mafmt.Pattern{mafmt.Nothing}
for _, s := range security {
u.securityMuxer.AddHandler(s.ID(), nil)
u.securityIDs = append(u.securityIDs, s.ID())
if straight, ok := s.(sec.StraightableSecureTransport); ok {
u.straightSecurity = append(u.straightSecurity, straight)
if straight.Suffix() == nil {
return nil, fmt.Errorf("StraightSecureTransport %q returned an empty suffix", straight.ID())
}
suffixes = append(suffixes, straight.SuffixMatcher())
}
}
u.straightMatcher = mafmt.Or(suffixes...)
return u, nil
}

// UpgradeListener upgrades the passed multiaddr-net listener into a full libp2p-transport listener.
func (u *upgrader) UpgradeListener(t transport.Transport, list manet.Listener) transport.Listener {
func (u *upgrader) UpgradeListener(t transport.Transport, list manet.Listener) transport.ListenerFromUpgrader {
ctx, cancel := context.WithCancel(context.Background())
l := &listener{
Listener: list,
Expand All @@ -120,17 +136,20 @@ func (u *upgrader) UpgradeListener(t transport.Transport, list manet.Listener) t
return l
}

// Upgrade upgrades the multiaddr/net connection into a full libp2p-transport connection.
func (u *upgrader) Upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, dir network.Direction, p peer.ID, connScope network.ConnManagementScope) (transport.CapableConn, error) {
c, err := u.upgrade(ctx, t, maconn, dir, p, connScope)
if err != nil {
connScope.Done()
return nil, err
}
return c, nil
func (u *upgrader) UpgradeOutbound(ctx context.Context, t transport.Transport, maconn manet.Conn, suffix ma.Multiaddr, p peer.ID, scope network.ConnManagementScope) (transport.CapableConn, error) {
return u.upgrade(ctx, t, maconn, network.DirOutbound, suffix, p, scope)
}
func (u *upgrader) UpgradeInbound(ctx context.Context, t transport.Transport, maconn manet.Conn, p peer.ID, scope network.ConnManagementScope) (transport.CapableConn, error) {
return u.upgrade(ctx, t, maconn, network.DirInbound, nil, p, scope)
}

func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, dir network.Direction, p peer.ID, connScope network.ConnManagementScope) (transport.CapableConn, error) {
// suffix is only used for Outbound connections
func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn manet.Conn, dir network.Direction, suffix ma.Multiaddr, p peer.ID, connScope network.ConnManagementScope) (_ transport.CapableConn, err error) {
defer func() {
if err != nil {
connScope.Done()
}
}()
if dir == network.DirOutbound && p == "" {
return nil, ErrNilPeer
}
Expand All @@ -153,7 +172,7 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
}

isServer := dir == network.DirInbound
sconn, security, err := u.setupSecurity(ctx, conn, p, isServer)
sconn, security, err := u.setupSecurity(ctx, conn, suffix, p, isServer)
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to negotiate security protocol: %w", err)
Expand Down Expand Up @@ -200,8 +219,8 @@ func (u *upgrader) upgrade(ctx context.Context, t transport.Transport, maconn ma
return tc, nil
}

func (u *upgrader) setupSecurity(ctx context.Context, conn net.Conn, p peer.ID, isServer bool) (sec.SecureConn, protocol.ID, error) {
st, err := u.negotiateSecurity(ctx, conn, isServer)
func (u *upgrader) setupSecurity(ctx context.Context, conn net.Conn, suffix ma.Multiaddr, p peer.ID, isServer bool) (sec.SecureConn, protocol.ID, error) {
st, err := u.negotiateSecurity(ctx, conn, suffix, isServer)
if err != nil {
return nil, "", err
}
Expand Down Expand Up @@ -306,22 +325,51 @@ func (u *upgrader) getSecurityByID(id protocol.ID) sec.SecureTransport {
return nil
}

func (u *upgrader) negotiateSecurity(ctx context.Context, insecure net.Conn, server bool) (sec.SecureTransport, error) {
var ErrNoMagiselectMatch = errors.New("no magiselect match")

func (u *upgrader) negotiateSecurity(ctx context.Context, insecure net.Conn, suffix ma.Multiaddr, server bool) (sec.SecureTransport, error) {
if suffix != nil {
for _, straight := range u.straightSecurity {
if straight.SuffixMatcher().Matches(suffix) {
return straight, nil
}
}
return nil, fmt.Errorf("suffix was provided but does not match anything %q %q", insecure, suffix) // buggy transport
}

type result struct {
proto protocol.ID
st sec.SecureTransport
err error
}

done := make(chan result, 1)
go func() {
if server {
var r result
r.proto, _, r.err = u.securityMuxer.Negotiate(insecure)
done <- r
return
}
var r result
r.proto, r.err = mss.SelectOneOf(u.securityIDs, insecure)
r.proto, r.st, r.err = func() (protocol.ID, sec.SecureTransport, error) {
if server {
s, insecure, err := magiselect.ReadSampleFromConn(insecure)
if err != nil {
return "", nil, err
}

if magiselect.IsMultistreamSelect(s) {
proto, _, err := u.securityMuxer.Negotiate(insecure)
return proto, nil, err
}

for _, ss := range u.straightSecurity {
if ss.Match(s) {
return "", ss, nil
}
}

return "", nil, ErrNoMagiselectMatch
}

proto, err := mss.SelectOneOf(u.securityIDs, insecure)
return proto, nil, err
}()
done <- r
}()

Expand All @@ -330,6 +378,9 @@ func (u *upgrader) negotiateSecurity(ctx context.Context, insecure net.Conn, ser
if r.err != nil {
return nil, r.err
}
if r.st != nil {
return r.st, nil
}
if s := u.getSecurityByID(r.proto); s != nil {
return s, nil
}
Expand All @@ -341,3 +392,23 @@ func (u *upgrader) negotiateSecurity(ctx context.Context, insecure net.Conn, ser
return nil, ctx.Err()
}
}

func (u *upgrader) Suffixes() []ma.Multiaddr {
r := make([]ma.Multiaddr, len(u.straightSecurity)+1) // +1 for nil indicating multistream-select
for i, s := range u.straightSecurity {
r[i] = s.Suffix()
}
return r
}

func (u *upgrader) SuffixesProtocols() []int {
r := make([]int, len(u.straightSecurity))
for i, s := range u.straightSecurity {
r[i] = s.SuffixProtocol()
}
return r
}

func (u *upgrader) SuffixMatcher() mafmt.Pattern {
return u.straightMatcher
}
2 changes: 1 addition & 1 deletion p2p/net/upgrader/upgrader_test.go
Expand Up @@ -129,7 +129,7 @@ func dial(t *testing.T, upgrader transport.Upgrader, raddr ma.Multiaddr, p peer.
if err != nil {
return nil, err
}
return upgrader.Upgrade(context.Background(), nil, macon, network.DirOutbound, p, scope)
return upgrader.UpgradeOutbound(context.Background(), nil, macon, nil, p, scope)
}

func TestOutboundConnectionGating(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion p2p/protocol/circuitv2/client/transport.go
Expand Up @@ -70,7 +70,8 @@ func (c *Client) dialAndUpgrade(ctx context.Context, a ma.Multiaddr, p peer.ID,
return nil, err
}
conn.tagHop()
cc, err := c.upgrader.Upgrade(ctx, c, conn, network.DirOutbound, p, connScope)
// FIXME: remove our leading maddr and compute suffix
cc, err := c.upgrader.UpgradeOutbound(ctx, c, conn, nil, p, connScope)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 239e7ce

Please sign in to comment.