Skip to content

Commit

Permalink
Added support for using reuseport in connection Dialing libp2p#1435
Browse files Browse the repository at this point in the history
  • Loading branch information
chaitanyaprem committed Apr 26, 2023
1 parent e7b4454 commit 445f860
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 12 deletions.
11 changes: 10 additions & 1 deletion p2p/transport/websocket/websocket.go
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p/core/transport"
"github.com/libp2p/go-reuseport"

reusetransport "github.com/libp2p/go-libp2p/p2p/net/reuseport"
ma "github.com/multiformats/go-multiaddr"
mafmt "github.com/multiformats/go-multiaddr-fmt"
manet "github.com/multiformats/go-multiaddr/net"
Expand Down Expand Up @@ -91,6 +92,7 @@ type WebsocketTransport struct {
tlsClientConf *tls.Config
tlsConf *tls.Config
reuseport bool //reuseport is disabled by default, can be enabled by passing it as an option.
reuse reusetransport.Transport
}

var _ transport.Transport = (*WebsocketTransport)(nil)
Expand Down Expand Up @@ -198,7 +200,14 @@ func (t *WebsocketTransport) maDial(ctx context.Context, raddr ma.Multiaddr) (ma

transport := &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := net.Dial(network, addr)
var conn manet.Conn
var err error
if t.UseReuseport() {
conn, err = t.reuse.Dial(raddr)
} else {
var d manet.Dialer
conn, err = d.Dial(raddr)
}
if err != nil {
close(localAddrChan)
return nil, err
Expand Down
39 changes: 28 additions & 11 deletions p2p/transport/websocket/websocket_test.go
Expand Up @@ -552,51 +552,68 @@ func TestResolveMultiaddr(t *testing.T) {

func TestListenerResusePort(t *testing.T) {
laddr := ma.StringCast("/ip4/127.0.0.1/tcp/5002/ws")
fmt.Println("Starting Reuse Port test.")
//fmt.Println("Starting Reuse Port test.")
var wg sync.WaitGroup
var opts []Option
opts = append(opts, EnableReuseport())
_, u := newUpgrader(t)
tpt, err := New(u, &network.NullResourceManager{}, opts...)
require.NoError(t, err)
fmt.Println("Invoking Go routines.")
//fmt.Println("Invoking Go routines.")
for i := 0; i < 2; i++ {
wg.Add(1)
go func(index int) {
l, err := tpt.Listen(laddr)
l, err := tpt.maListen(laddr)
if err != nil {
fmt.Println("Failed to listen on websocket due to error ", err)
}
require.NoError(t, err)
require.Equal(t, lastComponent(t, l.Multiaddr()), wsComponent)
defer l.Close()
fmt.Println("Routine-", index, " Calling Accept...")
//fmt.Println("Routine-", index, " Calling Accept...")
for j := 0; j < 2; j++ {
conn, err := l.Accept()
if err != nil {
fmt.Println("Routine-", index, " Failed accepting connection due to error ", err)
}
//require.NoError(t, err)
fmt.Println("Routine-", index, " Accepting connection ", conn)
require.NoError(t, err)
//fmt.Println("Routine-", index, " Accepting connection ", conn)
defer conn.Close()
buf := make([]byte, 6)
n, err := conn.Read(buf)
if n != 6 {
t.Errorf("read %d bytes, expected 2", n)
}
require.NoError(t, err)
fmt.Println("Read bytes:", buf)
}
}(i)
}
time.Sleep(2 * time.Second)
fmt.Println("Invoking Connector Go routines.")
//fmt.Println("Invoking Connector Go routines.")

for i := 0; i < 4; i++ {
go func(index int) {
fmt.Println("Routine-", index, " Initiating connection ")
//fmt.Println("Routine-", index, " Initiating connection ")
c, err := tpt.maDial(context.Background(), laddr)
if err != nil {
t.Error(err)
return
}
require.NoError(t, err)
defer c.Close()
fmt.Println("Sleeping for 10 seconds after connection intiation")
time.Sleep(10 * time.Second)
//fmt.Println("Sleeping for 2 seconds after connection intiation")
msg := fmt.Sprintf("Hello%d", index)
n, err := c.Write([]byte(msg))
if n != 6 {
t.Errorf("expected to write 0 bytes, wrote %d", n)
}
if err != nil {
t.Error(err)
return
}
time.Sleep(2 * time.Second)
}(i)
}
wg.Wait()
time.Sleep(2 * time.Second)
}

0 comments on commit 445f860

Please sign in to comment.