Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

webrtc: testing some pion fixes #2671

Closed
wants to merge 14 commits into from
36 changes: 19 additions & 17 deletions go.mod
Expand Up @@ -45,10 +45,10 @@ require (
github.com/multiformats/go-varint v0.0.7
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58
github.com/pion/datachannel v1.5.5
github.com/pion/ice/v2 v2.3.6
github.com/pion/ice/v2 v2.3.11
github.com/pion/logging v0.2.2
github.com/pion/stun v0.6.0
github.com/pion/webrtc/v3 v3.2.9
github.com/pion/stun v0.6.1
github.com/pion/webrtc/v3 v3.2.23
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/client_model v0.4.0
github.com/quic-go/quic-go v0.39.3
Expand All @@ -59,10 +59,10 @@ require (
go.uber.org/goleak v1.2.0
go.uber.org/mock v0.3.0
go.uber.org/zap v1.26.0
golang.org/x/crypto v0.14.0
golang.org/x/crypto v0.16.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.4.0
golang.org/x/sys v0.13.0
golang.org/x/sys v0.15.0
golang.org/x/tools v0.14.0
google.golang.org/protobuf v1.30.0
)
Expand All @@ -88,7 +88,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
Expand All @@ -102,17 +102,17 @@ require (
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/pion/dtls/v2 v2.2.7 // indirect
github.com/pion/interceptor v0.1.17 // indirect
github.com/pion/mdns v0.0.7 // indirect
github.com/pion/dtls/v2 v2.2.8 // indirect
github.com/pion/interceptor v0.1.25 // indirect
github.com/pion/mdns v0.0.9 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.10 // indirect
github.com/pion/rtp v1.7.13 // indirect
github.com/pion/sctp v1.8.7 // indirect
github.com/pion/rtcp v1.2.13 // indirect
github.com/pion/rtp v1.8.3 // indirect
github.com/pion/sctp v1.8.9 // indirect
github.com/pion/sdp/v3 v3.0.6 // indirect
github.com/pion/srtp/v2 v2.0.15 // indirect
github.com/pion/transport/v2 v2.2.1 // indirect
github.com/pion/turn/v2 v2.1.0 // indirect
github.com/pion/srtp/v2 v2.0.18 // indirect
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/turn/v2 v2.1.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand All @@ -125,8 +125,10 @@ require (
go.uber.org/dig v1.17.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/net v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.2.1 // indirect
)

replace github.com/pion/sctp => github.com/sukunrt/sctp v0.0.0-20231217055423-1ff4ca21cc89
106 changes: 62 additions & 44 deletions go.sum

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions p2p/net/swarm/swarm_stream.go
Expand Up @@ -75,9 +75,19 @@ func (s *Stream) Write(p []byte) (int, error) {
return n, err
}

type asyncCloser interface {
AsyncClose(onDone func()) error
}

// Close closes the stream, closing both ends and freeing all associated
// resources.
func (s *Stream) Close() error {
if as, ok := s.stream.(asyncCloser); ok {
err := as.AsyncClose(func() {
s.closeAndRemoveStream()
})
return err
}
err := s.stream.Close()
s.closeAndRemoveStream()
return err
Expand Down
45 changes: 45 additions & 0 deletions p2p/net/swarm/swarm_stream_test.go
@@ -0,0 +1,45 @@
package swarm

import (
"context"
"sync/atomic"
"testing"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/require"
)

type asyncStreamWrapper struct {
network.MuxedStream
beforeClose func()
}

func (s *asyncStreamWrapper) AsyncClose(onDone func()) error {
s.beforeClose()
err := s.Close()
onDone()
return err
}

func TestStreamAsyncCloser(t *testing.T) {
s1 := makeSwarm(t)
s2 := makeSwarm(t)

s1.Peerstore().AddAddrs(s2.LocalPeer(), s2.ListenAddresses(), peerstore.TempAddrTTL)
s, err := s1.NewStream(context.Background(), s2.LocalPeer())
require.NoError(t, err)
ss, ok := s.(*Stream)
require.True(t, ok)

var called atomic.Bool
as := &asyncStreamWrapper{
MuxedStream: ss.stream,
beforeClose: func() {
called.Store(true)
},
}
ss.stream = as
ss.Close()
require.True(t, called.Load())
}
83 changes: 83 additions & 0 deletions p2p/test/swarm/swarm_test.go
Expand Up @@ -2,6 +2,7 @@ package swarm_test

import (
"context"
"fmt"
"io"
"sync"
"testing"
Expand All @@ -14,6 +15,7 @@ import (
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/client"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
libp2pwebrtc "github.com/libp2p/go-libp2p/p2p/transport/webrtc"
ma "github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -243,3 +245,84 @@ func TestLimitStreamsWhenHangingHandlers(t *testing.T) {
return false
}, 5*time.Second, 100*time.Millisecond)
}

func TestLimitStreamsWhenHangingHandlersWebRTC(t *testing.T) {
var partial rcmgr.PartialLimitConfig
const streamLimit = 10
partial.System.Streams = streamLimit
mgr, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(partial.Build(rcmgr.InfiniteLimits)))
require.NoError(t, err)

maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/udp/0/webrtc-direct")
require.NoError(t, err)

receiver, err := libp2p.New(
libp2p.ResourceManager(mgr),
libp2p.ListenAddrs(maddr),
libp2p.Transport(libp2pwebrtc.New),
)
require.NoError(t, err)
t.Cleanup(func() { receiver.Close() })

var wg sync.WaitGroup
wg.Add(1)

const pid = "/test"
receiver.SetStreamHandler(pid, func(s network.Stream) {
defer s.Close()
s.Write([]byte{42})
wg.Wait()
})

// Open streamLimit streams
success := 0
// we make a lot of tries because identify and identify push take up a few streams
for i := 0; i < 1000 && success < streamLimit; i++ {
mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
require.NoError(t, err)

sender, err := libp2p.New(libp2p.ResourceManager(mgr), libp2p.Transport(libp2pwebrtc.New))
require.NoError(t, err)
t.Cleanup(func() { sender.Close() })

sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL)

s, err := sender.NewStream(context.Background(), receiver.ID(), pid)
if err != nil {
continue
}

var b [1]byte
_, err = io.ReadFull(s, b[:])
if err == nil {
success++
}
sender.Close()
}
require.Equal(t, streamLimit, success)
// We have the maximum number of streams open. Next call should fail.
mgr, err = rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
require.NoError(t, err)

sender, err := libp2p.New(libp2p.ResourceManager(mgr), libp2p.Transport(libp2pwebrtc.New))
require.NoError(t, err)
t.Cleanup(func() { sender.Close() })

sender.Peerstore().AddAddrs(receiver.ID(), receiver.Addrs(), peerstore.PermanentAddrTTL)

_, err = sender.NewStream(context.Background(), receiver.ID(), pid)
require.Error(t, err)
// Close the open streams
wg.Done()

// Next call should succeed
require.Eventually(t, func() bool {
s, err := sender.NewStream(context.Background(), receiver.ID(), pid)
if err == nil {
s.Close()
return true
}
fmt.Println(err)
return false
}, 5*time.Second, 1*time.Second)
}
13 changes: 4 additions & 9 deletions p2p/test/transport/transport_test.go
Expand Up @@ -9,7 +9,6 @@ import (
"io"
"net"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -314,9 +313,6 @@ func TestManyStreams(t *testing.T) {
const streamCount = 128
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "WebRTC") {
t.Skip("Pion doesn't correctly handle large queues of streams.")
}
h1 := tc.HostGenerator(t, TransportTestCaseOpts{NoRcmgr: true})
h2 := tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, NoRcmgr: true})
defer h1.Close()
Expand Down Expand Up @@ -382,9 +378,6 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
const streamCount = 1024
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
if strings.Contains(tc.Name, "WebRTC") {
t.Skip("This test potentially exhausts the uint16 WebRTC stream ID space.")
}
listenerLimits := rcmgr.PartialLimitConfig{
PeerDefault: rcmgr.ResourceLimits{
Streams: 32,
Expand Down Expand Up @@ -425,7 +418,7 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
var completedStreams atomic.Int32

const maxWorkerCount = streamCount
workerCount := 4
workerCount := 16

var startWorker func(workerIdx int)
startWorker = func(workerIdx int) {
Expand All @@ -440,7 +433,9 @@ func TestMoreStreamsThanOurLimits(t *testing.T) {
// Inline function so we can use defer
func() {
var didErr bool
defer completedStreams.Add(1)
defer func() {
fmt.Println("completed: ", completedStreams.Add(1))
}()
defer func() {
// Only the first worker adds more workers
if workerIdx == 0 && !didErr && !sawFirstErr.Load() {
Expand Down