Skip to content

Commit

Permalink
feat: added REUSEPORT support for websocket listeners libp2p#1435
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Apr 26, 2023
1 parent e4f1e0c commit e7b4454
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 7 deletions.
2 changes: 1 addition & 1 deletion p2p/transport/websocket/addrs_test.go
Expand Up @@ -74,7 +74,7 @@ func TestConvertWebsocketMultiaddrToNetAddr(t *testing.T) {
}

func TestListeningOnDNSAddr(t *testing.T) {
ln, err := newListener(ma.StringCast("/dns/localhost/tcp/0/ws"), nil)
ln, err := newListener(ma.StringCast("/dns/localhost/tcp/0/ws"), nil, false)
require.NoError(t, err)
addr := ln.Multiaddr()
first, rest := ma.SplitFirst(addr)
Expand Down
21 changes: 16 additions & 5 deletions p2p/transport/websocket/listener.go
Expand Up @@ -10,6 +10,7 @@ import (
"net/url"

"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-reuseport"

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -44,7 +45,8 @@ func (pwma *parsedWebsocketMultiaddr) toMultiaddr() ma.Multiaddr {

// newListener creates a new listener from a raw net.Listener.
// tlsConf may be nil (for unencrypted websockets).
func newListener(a ma.Multiaddr, tlsConf *tls.Config) (*listener, error) {
func newListener(a ma.Multiaddr, tlsConf *tls.Config, reuseportAvailable bool) (*listener, error) {
var nl net.Listener
parsed, err := parseWebsocketMultiaddr(a)
if err != nil {
return nil, err
Expand All @@ -58,11 +60,20 @@ func newListener(a ma.Multiaddr, tlsConf *tls.Config) (*listener, error) {
if err != nil {
return nil, err
}
nl, err := net.Listen(lnet, lnaddr)
if err != nil {
return nil, err
if reuseportAvailable {
nl, err = reuseport.Listen(lnet, lnaddr)
if err != nil {
nl, err = net.Listen(lnet, lnaddr)
if err != nil {
return nil, err
}
}
} else {
nl, err = net.Listen(lnet, lnaddr)
if err != nil {
return nil, err
}
}

laddr, err := manet.FromNetAddr(nl.Addr())
if err != nil {
return nil, err
Expand Down
17 changes: 16 additions & 1 deletion p2p/transport/websocket/websocket.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-reuseport"

ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
Expand Down Expand Up @@ -55,6 +56,13 @@ func init() {

type Option func(*WebsocketTransport) error

func EnableReuseport() Option {
return func(tr *WebsocketTransport) error {
tr.reuseport = true
return nil
}
}

// WithTLSClientConfig sets a TLS client configuration on the WebSocket Dialer. Only
// relevant for non-browser usages.
//
Expand Down Expand Up @@ -82,6 +90,7 @@ type WebsocketTransport struct {

tlsClientConf *tls.Config
tlsConf *tls.Config
reuseport bool //reuseport is disabled by default, can be enabled by passing it as an option.
}

var _ transport.Transport = (*WebsocketTransport)(nil)
Expand All @@ -90,6 +99,7 @@ func New(u transport.Upgrader, rcmgr network.ResourceManager, opts ...Option) (*
if rcmgr == nil {
rcmgr = &network.NullResourceManager{}
}

t := &WebsocketTransport{
upgrader: u,
rcmgr: rcmgr,
Expand Down Expand Up @@ -267,7 +277,7 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma
}

func (t *WebsocketTransport) maListen(a ma.Multiaddr) (manet.Listener, error) {
l, err := newListener(a, t.tlsConf)
l, err := newListener(a, t.tlsConf, t.reuseport)
if err != nil {
return nil, err
}
Expand All @@ -282,3 +292,8 @@ func (t *WebsocketTransport) Listen(a ma.Multiaddr) (transport.Listener, error)
}
return &transportListener{Listener: t.upgrader.UpgradeListener(t, malist)}, nil
}

// UseReuseport returns true if reuseport is enabled and available.
func (t *WebsocketTransport) UseReuseport() bool {
return t.reuseport && reuseport.Available()
}
52 changes: 52 additions & 0 deletions p2p/transport/websocket/websocket_test.go
Expand Up @@ -16,6 +16,7 @@ import (
"net"
"net/http"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -548,3 +549,54 @@ func TestResolveMultiaddr(t *testing.T) {
})
}
}

func TestListenerResusePort(t *testing.T) {
laddr := ma.StringCast("/ip4/127.0.0.1/tcp/5002/ws")
fmt.Println("Starting Reuse Port test.")
var wg sync.WaitGroup
var opts []Option
opts = append(opts, EnableReuseport())
_, u := newUpgrader(t)
tpt, err := New(u, &network.NullResourceManager{}, opts...)
require.NoError(t, err)
fmt.Println("Invoking Go routines.")
for i := 0; i < 2; i++ {
wg.Add(1)
go func(index int) {
l, err := tpt.Listen(laddr)
if err != nil {
fmt.Println("Failed to listen on websocket due to error ", err)
}
require.NoError(t, err)
require.Equal(t, lastComponent(t, l.Multiaddr()), wsComponent)
defer l.Close()
fmt.Println("Routine-", index, " Calling Accept...")
for j := 0; j < 2; j++ {
conn, err := l.Accept()
if err != nil {
fmt.Println("Routine-", index, " Failed accepting connection due to error ", err)
}
//require.NoError(t, err)
fmt.Println("Routine-", index, " Accepting connection ", conn)
defer conn.Close()
}
}(i)
}
time.Sleep(2 * time.Second)
fmt.Println("Invoking Connector Go routines.")

for i := 0; i < 4; i++ {
go func(index int) {
fmt.Println("Routine-", index, " Initiating connection ")
c, err := tpt.maDial(context.Background(), laddr)
if err != nil {
t.Error(err)
return
}
defer c.Close()
fmt.Println("Sleeping for 10 seconds after connection intiation")
time.Sleep(10 * time.Second)
}(i)
}
wg.Wait()
}

0 comments on commit e7b4454

Please sign in to comment.